diff --git a/src/nanoarrow/common/array.c b/src/nanoarrow/common/array.c index 2f0f824cf..ec7bdb667 100644 --- a/src/nanoarrow/common/array.c +++ b/src/nanoarrow/common/array.c @@ -1513,10 +1513,26 @@ static int ArrowArrayViewValidateFull(struct ArrowArrayView* array_view, NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidateFull(array_view->children[i], error)); } - // Dictionary validation not implemented + // Dictionary index validation if (array_view->dictionary != NULL) { NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidateFull(array_view->dictionary, error)); - // TODO: validate the indices + + // Validate that all non-null indices are within the dictionary bounds + int64_t dictionary_length = array_view->dictionary->length; + for (int64_t i = 0; i < array_view->length; i++) { + if (ArrowArrayViewIsNull(array_view, i)) { + continue; + } + + int64_t index = ArrowArrayViewGetIntUnsafe(array_view, i); + if (index < 0 || index >= dictionary_length) { + ArrowErrorSet(error, + "[%" PRId64 "] Expected dictionary index >= 0 and < %" PRId64 + " but found value %" PRId64, + i, dictionary_length, index); + return EINVAL; + } + } } return NANOARROW_OK; diff --git a/src/nanoarrow/common/array_test.cc b/src/nanoarrow/common/array_test.cc index 35a937871..b50e7266e 100644 --- a/src/nanoarrow/common/array_test.cc +++ b/src/nanoarrow/common/array_test.cc @@ -144,6 +144,60 @@ TEST(ArrayTest, ArrayTestAllocateDictionary) { ArrowArrayRelease(&array); } +TEST(ArrayTest, ArrayTestValidateDictionaryIndices) { + struct ArrowArray array; + struct ArrowSchema schema; + struct ArrowArrayView array_view; + struct ArrowError error; + + // Create a schema for dictionary-encoded int32 with string dictionary + ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_INT32), NANOARROW_OK); + ASSERT_EQ(ArrowSchemaAllocateDictionary(&schema), NANOARROW_OK); + ASSERT_EQ(ArrowSchemaInitFromType(schema.dictionary, NANOARROW_TYPE_STRING), + NANOARROW_OK); + + // Initialize array_view from schema + ASSERT_EQ(ArrowArrayViewInitFromSchema(&array_view, &schema, &error), NANOARROW_OK); + + // Create a dictionary-encoded int32 array with a string dictionary + ASSERT_EQ(ArrowArrayInitFromSchema(&array, &schema, &error), NANOARROW_OK); + + // Build the array with dictionary values: ["zero", "one"] and indices [0, 1, 0] + ASSERT_EQ(ArrowArrayStartAppending(&array), NANOARROW_OK); + ASSERT_EQ(ArrowArrayAppendString(array.dictionary, "zero"_asv), NANOARROW_OK); + ASSERT_EQ(ArrowArrayAppendString(array.dictionary, "one"_asv), NANOARROW_OK); + ASSERT_EQ(ArrowArrayAppendInt(&array, 0), NANOARROW_OK); + ASSERT_EQ(ArrowArrayAppendInt(&array, 1), NANOARROW_OK); + ASSERT_EQ(ArrowArrayAppendInt(&array, 0), NANOARROW_OK); + ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array, &error), NANOARROW_OK); + + // Valid indices should pass validation + ASSERT_EQ(ArrowArrayViewSetArray(&array_view, &array, &error), NANOARROW_OK); + EXPECT_EQ(ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error), + NANOARROW_OK); + + // Now modify index to be out of bounds (index 2 when dictionary has length 2) + int32_t* indices = reinterpret_cast(ArrowArrayBuffer(&array, 1)->data); + indices[1] = 2; // Out of bounds (valid range is 0-1) + ASSERT_EQ(ArrowArrayViewSetArray(&array_view, &array, &error), NANOARROW_OK); + EXPECT_EQ(ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error), + EINVAL); + EXPECT_STREQ(error.message, + "[1] Expected dictionary index >= 0 and < 2 but found value 2"); + + // Test negative index + indices[1] = -1; + ASSERT_EQ(ArrowArrayViewSetArray(&array_view, &array, &error), NANOARROW_OK); + EXPECT_EQ(ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error), + EINVAL); + EXPECT_STREQ(error.message, + "[1] Expected dictionary index >= 0 and < 2 but found value -1"); + + ArrowArrayViewReset(&array_view); + ArrowSchemaRelease(&schema); + ArrowArrayRelease(&array); +} + TEST(ArrayTest, ArrayTestInitFromSchema) { struct ArrowArray array; struct ArrowSchema schema; diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c index 520219ed9..4b0239c13 100644 --- a/src/nanoarrow/ipc/decoder.c +++ b/src/nanoarrow/ipc/decoder.c @@ -295,6 +295,44 @@ ArrowErrorCode ArrowIpcDictionaryEncodingsAppend( return NANOARROW_OK; } +static ArrowErrorCode ArrowIpcDictionaryEncodingsAppendSchemaInternal( + struct ArrowIpcDictionaryEncodings* dictionary_encodings, + const struct ArrowSchema* schema, int64_t* next_id) { + if (schema->dictionary != NULL) { + struct ArrowIpcDictionaryEncoding encoding; + encoding.schema = schema; + encoding.id = (*next_id)++; + encoding.kind = NANOARROW_IPC_DICTIONARY_KIND_DENSE_ARRAY; + NANOARROW_RETURN_NOT_OK( + ArrowIpcDictionaryEncodingsAppend(dictionary_encodings, encoding)); + } + + for (int64_t i = 0; i < schema->n_children; i++) { + NANOARROW_DCHECK(schema->children != NULL && schema->children[i] != NULL); + NANOARROW_RETURN_NOT_OK(ArrowIpcDictionaryEncodingsAppendSchemaInternal( + dictionary_encodings, schema->children[i], next_id)); + } + + if (schema->dictionary != NULL) { + NANOARROW_RETURN_NOT_OK(ArrowIpcDictionaryEncodingsAppendSchemaInternal( + dictionary_encodings, schema->dictionary, next_id)); + } + + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcDictionaryEncodingsAppendSchema( + struct ArrowIpcDictionaryEncodings* dictionary_encodings, + const struct ArrowSchema* schema) { + NANOARROW_DCHECK(dictionary_encodings != NULL); + + int64_t next_id = 0; + NANOARROW_RETURN_NOT_OK(ArrowIpcDictionaryEncodingsAppendSchemaInternal( + dictionary_encodings, schema, &next_id)); + + return NANOARROW_OK; +} + const struct ArrowIpcDictionaryEncoding* ArrowIpcDictionaryEncodingsFind( const struct ArrowIpcDictionaryEncodings* dictionary_encodings, const struct ArrowSchema* schema) { @@ -414,12 +452,13 @@ static ArrowErrorCode ArrowIpcDictionaryReplace(struct ArrowIpcDictionary* dicti static ArrowErrorCode ArrowIpcDictionaryAppend(struct ArrowIpcDictionary* dictionary, struct ArrowArray* value, struct ArrowError* error) { - if (dictionary->current_value.release != NULL) { + if (dictionary->current_value.release != NULL && + dictionary->current_value.length != 0) { ArrowErrorSet(error, "Dictionary concatenation is not yet supported"); return ENOTSUP; } - ArrowArrayMove(value, &dictionary->current_value); + NANOARROW_RETURN_NOT_OK(ArrowIpcDictionaryReplace(dictionary, value, error)); return NANOARROW_OK; } @@ -492,7 +531,24 @@ static ArrowErrorCode ArrowIpcDictionariesInitDictionaries( return result; } + // Set the ID dictionary->id = unique_ids[i]; + + // Set the initial array value to a valid array with zero length. This is + // needed because empty and/or all null columns may not have a dictionary + // message emitted before a record batch arrives. + result = ArrowArrayInitFromSchema(&dictionary->current_value, + encoding->schema->dictionary, error); + if (result != NANOARROW_OK) { + *num_initialized_decoders_out = i + 1; + return result; + } + + result = ArrowArrayFinishBuildingDefault(&dictionary->current_value, error); + if (result != NANOARROW_OK) { + *num_initialized_decoders_out = i + 1; + return result; + } } *num_initialized_decoders_out = private_data->num_dictionaries; @@ -1203,78 +1259,15 @@ static int ArrowIpcDecoderSetType(struct ArrowSchema* schema, ns(Field_table_t) } } -// A fun corner case when decoding dictionaries: the extension metadata lives with -// the dictionary (i.e., the non-index type); however, the field metadata still -// needs to exist on the field. -static int ArrowIpcMoveNonExtensionFieldMetadataBackToFieldIfNeeded( - struct ArrowSchema* schema) { +// When decoding dictionaries, we move the value type to the schema->dictionary +// member, but we need to move the field metadata back because in IPC there +// is no such thing as dictionary metadata (even extension metadata) +// https://github.com/apache/arrow/issues/49704 +static int ArrowIpcMoveDictionaryMetadataBackToField(struct ArrowSchema* schema) { NANOARROW_DCHECK(schema->dictionary != NULL); - struct ArrowMetadataReader reader; - NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderInit(&reader, schema->dictionary->metadata)); - - // For the most common case (no metadata), nothing needs to be done here - if (reader.remaining_keys == 0) { - return NANOARROW_OK; - } - - struct ArrowBuffer field_metadata; - struct ArrowBuffer extension_metadata; - NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderInit(&field_metadata, NULL)); - ArrowErrorCode result = ArrowMetadataBuilderInit(&extension_metadata, NULL); - if (result != NANOARROW_OK) { - ArrowBufferReset(&field_metadata); - return result; - } - - const struct ArrowStringView extension_name_key = ArrowCharView("ARROW:extension:name"); - const struct ArrowStringView extension_metadata_key = - ArrowCharView("ARROW:extension:metadata"); - - struct ArrowStringView key; - struct ArrowStringView value; - while (reader.remaining_keys > 0) { - result = ArrowMetadataReaderRead(&reader, &key, &value); - if (result != NANOARROW_OK) { - ArrowBufferReset(&field_metadata); - ArrowBufferReset(&extension_metadata); - return result; - } - - int key_is_extension_name = - key.size_bytes == extension_name_key.size_bytes && - strncmp(key.data, extension_name_key.data, key.size_bytes) == 0; - int key_is_extension_metadata = - key.size_bytes == extension_metadata_key.size_bytes && - strncmp(key.data, extension_metadata_key.data, key.size_bytes) == 0; - if (!key_is_extension_name && !key_is_extension_metadata) { - result = ArrowMetadataBuilderAppend(&field_metadata, key, value); - if (result != NANOARROW_OK) { - ArrowBufferReset(&field_metadata); - ArrowBufferReset(&extension_metadata); - return result; - } - } else { - result = ArrowMetadataBuilderAppend(&extension_metadata, key, value); - if (result != NANOARROW_OK) { - ArrowBufferReset(&field_metadata); - ArrowBufferReset(&extension_metadata); - return result; - } - } - } - - result = ArrowSchemaSetMetadata(schema, (char*)field_metadata.data); - if (result != NANOARROW_OK) { - ArrowBufferReset(&field_metadata); - ArrowBufferReset(&extension_metadata); - return result; - } - - result = ArrowSchemaSetMetadata(schema->dictionary, (char*)extension_metadata.data); - ArrowBufferReset(&field_metadata); - ArrowBufferReset(&extension_metadata); - - return result; + NANOARROW_RETURN_NOT_OK(ArrowSchemaSetMetadata(schema, schema->dictionary->metadata)); + NANOARROW_RETURN_NOT_OK(ArrowSchemaSetMetadata(schema->dictionary, NULL)); + return NANOARROW_OK; } static int ArrowIpcSetDictionaryEncoding( @@ -1313,10 +1306,9 @@ static int ArrowIpcSetDictionaryEncoding( schema->flags |= ARROW_FLAG_DICTIONARY_ORDERED; } - // Field metadata should stay with the field; however, we need the extension metadata - // to stay with the dictionary. - NANOARROW_RETURN_NOT_OK_WITH_ERROR( - ArrowIpcMoveNonExtensionFieldMetadataBackToFieldIfNeeded(schema), error); + // Sort out field metadata between the schema and the dictionary member + NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowIpcMoveDictionaryMetadataBackToField(schema), + error); // Track the identifier if we have a dictionaries object in which to track it if (dictionaries != NULL) { @@ -2354,6 +2346,7 @@ struct ArrowIpcArraySetter { struct ArrowIpcBufferSource src; struct ArrowIpcBufferFactory factory; enum ArrowIpcMetadataVersion version; + struct ArrowIpcDictionaries* dictionaries; }; static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t offset, @@ -2420,6 +2413,12 @@ static int ArrowIpcDecoderWalkGetArray(struct ArrowArrayView* array_view, array_view->children[i], array->children[i], out->children[i], error)); } + if (array_view->dictionary != NULL) { + // TODO: this currently copies the array for every output. + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderWalkGetArray( + array_view->dictionary, array->dictionary, out->dictionary, error)); + } + return NANOARROW_OK; } @@ -2430,10 +2429,32 @@ static int ArrowIpcDecoderWalkSetArrayView(struct ArrowIpcDecoder* decoder, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; - struct ArrowIpcField* ipc_field = private_data->fields + setter->field_i; + + // setter->field_i indexes the flatbuffer FieldNode vector (which excludes the root + // struct), but private_data->fields includes the root at index 0, so add 1. + struct ArrowIpcField* ipc_field = private_data->fields + setter->field_i + 1; if (ipc_field->dictionary_id != NANOARROW_IPC_NO_DICTIONARY_ID) { - ArrowErrorSet(error, "Decoding a dictionary-encoding field is not supported"); - return ENOTSUP; + if (setter->dictionaries == NULL) { + ArrowErrorSet( + error, "Can't decode a dictionary-encoded field without ArrowIpcDictionaries"); + return ENOTSUP; + } + + const struct ArrowArray* dictionary; + NANOARROW_RETURN_NOT_OK(ArrowIpcDictionariesFindCurrentValue( + setter->dictionaries, ipc_field->dictionary_id, &dictionary, error)); + + if (dictionary->release == NULL) { + ArrowErrorSet(error, "Dictionary with ID %" PRId64 " is marked as released", + ipc_field->dictionary_id); + return EINVAL; + } + + // Set the dictionary array view from the value. We may be able to skip this + // if we can somehow detect that the dictionary hasn't changed since the last + // decode. + NANOARROW_RETURN_NOT_OK( + ArrowArrayViewSetArray(array_view->dictionary, dictionary, error)); } ns(FieldNode_struct_t) field = @@ -2538,7 +2559,8 @@ static ArrowErrorCode ArrowIpcDecoderDecodeArrayInternal( static ArrowErrorCode ArrowIpcDecoderDecodeArrayViewInternal( struct ArrowIpcDecoder* decoder, struct ArrowIpcBufferFactory factory, - int64_t field_i, struct ArrowArrayView** out_view, struct ArrowError* error) { + int64_t field_i, struct ArrowIpcDictionaries* dictionaries, + struct ArrowArrayView** out_view, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; @@ -2563,6 +2585,7 @@ static ArrowErrorCode ArrowIpcDecoderDecodeArrayViewInternal( setter.src.codec = decoder->codec; setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder); setter.version = decoder->metadata_version; + setter.dictionaries = dictionaries; // If we are going to need a decompressor here, ensure the default one is // initialized. @@ -2600,10 +2623,10 @@ static ArrowErrorCode ArrowIpcDecoderDecodeArrayViewInternal( return NANOARROW_OK; } -ArrowErrorCode ArrowIpcDecoderDecodeArrayView(struct ArrowIpcDecoder* decoder, - struct ArrowBufferView body, int64_t i, - struct ArrowArrayView** out, - struct ArrowError* error) { +NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayViewWithDictionaries( + struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i, + struct ArrowIpcDictionaries* dictionaries, struct ArrowArrayView** out, + struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; if (private_data->last_message == NULL || @@ -2613,14 +2636,21 @@ ArrowErrorCode ArrowIpcDecoderDecodeArrayView(struct ArrowIpcDecoder* decoder, } return ArrowIpcDecoderDecodeArrayViewInternal( - decoder, ArrowIpcBufferFactoryFromView(&body), i, out, error); + decoder, ArrowIpcBufferFactoryFromView(&body), i, dictionaries, out, error); } -ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder, - struct ArrowBufferView body, int64_t i, - struct ArrowArray* out, - enum ArrowValidationLevel validation_level, - struct ArrowError* error) { +ArrowErrorCode ArrowIpcDecoderDecodeArrayView(struct ArrowIpcDecoder* decoder, + struct ArrowBufferView body, int64_t i, + struct ArrowArrayView** out, + struct ArrowError* error) { + return ArrowIpcDecoderDecodeArrayViewWithDictionaries(decoder, body, i, NULL, out, + error); +} + +NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayWithDictionaries( + struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i, + struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out, + enum ArrowValidationLevel validation_level, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; if (private_data->last_message == NULL || @@ -2631,7 +2661,8 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder, struct ArrowArrayView* array_view; NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayViewInternal( - decoder, ArrowIpcBufferFactoryFromView(&body), i, &array_view, error)); + decoder, ArrowIpcBufferFactoryFromView(&body), i, dictionaries, &array_view, + error)); NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidate(array_view, validation_level, error)); @@ -2649,13 +2680,23 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder, return NANOARROW_OK; } -ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared( +ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder, + struct ArrowBufferView body, int64_t i, + struct ArrowArray* out, + enum ArrowValidationLevel validation_level, + struct ArrowError* error) { + return ArrowIpcDecoderDecodeArrayWithDictionaries(decoder, body, i, NULL, out, + validation_level, error); +} + +ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* body, int64_t i, - struct ArrowArray* out, enum ArrowValidationLevel validation_level, - struct ArrowError* error) { + struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out, + enum ArrowValidationLevel validation_level, struct ArrowError* error) { struct ArrowArrayView* array_view; NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayViewInternal( - decoder, ArrowIpcBufferFactoryFromShared(body), i, &array_view, error)); + decoder, ArrowIpcBufferFactoryFromShared(body), i, dictionaries, &array_view, + error)); NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidate(array_view, validation_level, error)); @@ -2673,14 +2714,18 @@ ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared( return NANOARROW_OK; } -NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, - enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries* dictionaries, +ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared( + struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* body, int64_t i, + struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error) { - NANOARROW_DCHECK(decoder != NULL); - NANOARROW_DCHECK(shared != NULL); - NANOARROW_DCHECK(dictionaries != NULL); + return ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries(decoder, body, i, NULL, out, + validation_level, error); +} +static ArrowErrorCode ArrowIpcDecoderDecodeDictionaryInternal( + struct ArrowIpcDecoder* decoder, struct ArrowIpcBufferFactory factory, + enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries* dictionaries, + struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; @@ -2709,15 +2754,26 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary( (struct ArrowIpcDecoderPrivate*)dictionary->decoder.private_data; dictionary->decoder.message_type = NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH; dictionary_decoder_private_data->last_message = record_batch; + // Transfer the endianness setting so that buffers are byte-swapped if needed + dictionary_decoder_private_data->endianness = private_data->endianness; - struct ArrowArray tmp; + struct ArrowArrayView* array_view; + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayViewInternal( + &dictionary->decoder, factory, 0, dictionaries, &array_view, error)); - // TODO: provide ArrowIpcDecoderDecodeArrayInternalWithDictionaries to handle nested - // dictionaries - NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayFromShared( - &dictionary->decoder, shared, 0, &tmp, validation_level, error)); + NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidate(array_view, validation_level, error)); + + struct ArrowArray tmp; + tmp.release = NULL; + int result = ArrowIpcDecoderDecodeArrayInternal(&dictionary->decoder, 0, &tmp, + validation_level, error); + if (result != NANOARROW_OK && tmp.release != NULL) { + ArrowArrayRelease(&tmp); + return result; + } else if (result != NANOARROW_OK) { + return result; + } - ArrowErrorCode result; if (decoder->dictionary->is_delta) { result = ArrowIpcDictionaryAppend(dictionary, &tmp, error); } else { @@ -2731,3 +2787,28 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary( return NANOARROW_OK; } + +NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary( + struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, + enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries* dictionaries, + struct ArrowError* error) { + NANOARROW_DCHECK(decoder != NULL); + NANOARROW_DCHECK(dictionaries != NULL); + + return ArrowIpcDecoderDecodeDictionaryInternal(decoder, + ArrowIpcBufferFactoryFromView(&body), + validation_level, dictionaries, error); +} + +NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionaryFromShared( + struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, + enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries* dictionaries, + struct ArrowError* error) { + NANOARROW_DCHECK(decoder != NULL); + NANOARROW_DCHECK(shared != NULL); + NANOARROW_DCHECK(dictionaries != NULL); + + return ArrowIpcDecoderDecodeDictionaryInternal(decoder, + ArrowIpcBufferFactoryFromShared(shared), + validation_level, dictionaries, error); +} diff --git a/src/nanoarrow/ipc/decoder_test.cc b/src/nanoarrow/ipc/decoder_test.cc index b6aeddb77..2143b610a 100644 --- a/src/nanoarrow/ipc/decoder_test.cc +++ b/src/nanoarrow/ipc/decoder_test.cc @@ -208,6 +208,19 @@ alignas(8) static uint8_t kDictionaryBatch[] = { 0x6f, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, }; +alignas(8) static uint8_t kDictionaryRecordBatch[] = { + 0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0c, 0x00, + 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00, 0x08, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00, 0x0c, 0x00, + 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 0x10, 0x00, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00}; + TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) { struct ArrowIpcDecoder decoder; struct ArrowError error; @@ -698,7 +711,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatchDecode) { // Make a dictionary encoded schema that matches that of the dictionary example batch ArrowSchemaInit(&schema); ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK); - ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32), NANOARROW_OK); + ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT8), NANOARROW_OK); ASSERT_EQ(ArrowSchemaAllocateDictionary(schema.children[0]), NANOARROW_OK); ASSERT_EQ( ArrowSchemaInitFromType(schema.children[0]->dictionary, NANOARROW_TYPE_STRING), @@ -721,11 +734,12 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatchDecode) { // Check that we can't decode a dictionary batch if we haven't read a dictionary batch // message ASSERT_EQ(ArrowIpcDecoderInit(&decoder), NANOARROW_OK); - struct ArrowIpcSharedBuffer shared; - ASSERT_EQ( - ArrowIpcDecoderDecodeDictionary(&decoder, &shared, NANOARROW_VALIDATION_LEVEL_FULL, - &dictionaries, &error), - EINVAL); + struct ArrowBufferView body; + body.data.data = nullptr; + body.size_bytes = 0; + ASSERT_EQ(ArrowIpcDecoderDecodeDictionary( + &decoder, body, NANOARROW_VALIDATION_LEVEL_FULL, &dictionaries, &error), + EINVAL); ASSERT_STREQ(error.message, "decoder did not just decode a DictionaryBatch message"); // Decode a dictionary batch and inspect metadata @@ -741,17 +755,12 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatchDecode) { EXPECT_FALSE(decoder.dictionary->is_delta); // Decode the dictionary batch - data.data.as_uint8 += decoder.header_size_bytes; - data.size_bytes = decoder.body_size_bytes; - struct ArrowBuffer body; - ArrowBufferInit(&body); - ASSERT_EQ(ArrowBufferAppendBufferView(&body, data), NANOARROW_OK); + body.data.as_uint8 = data.data.as_uint8 + decoder.header_size_bytes; + body.size_bytes = decoder.body_size_bytes; - ASSERT_EQ(ArrowIpcSharedBufferInit(&shared, &body), NANOARROW_OK); - ASSERT_EQ( - ArrowIpcDecoderDecodeDictionary(&decoder, &shared, NANOARROW_VALIDATION_LEVEL_FULL, - &dictionaries, &error), - NANOARROW_OK); + ASSERT_EQ(ArrowIpcDecoderDecodeDictionary( + &decoder, body, NANOARROW_VALIDATION_LEVEL_FULL, &dictionaries, &error), + NANOARROW_OK); // If we find the current value of the dictionary we should get the correct array const struct ArrowArray* dictionary_value; @@ -775,10 +784,9 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatchDecode) { // If we try to decode the dictionary again it should succeed (because the dictionary // is in replacement mode) - ASSERT_EQ( - ArrowIpcDecoderDecodeDictionary(&decoder, &shared, NANOARROW_VALIDATION_LEVEL_FULL, - &dictionaries, &error), - NANOARROW_OK); + ASSERT_EQ(ArrowIpcDecoderDecodeDictionary( + &decoder, body, NANOARROW_VALIDATION_LEVEL_FULL, &dictionaries, &error), + NANOARROW_OK); ASSERT_EQ(ArrowArrayViewSetArray(&array_view, dictionary_value, &error), NANOARROW_OK); ASSERT_EQ(array_view.length, 3); @@ -788,14 +796,75 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatchDecode) { // If we try to decode a delta dictionary, we should fail with a reasonable message const_cast(decoder.dictionary)->is_delta = 1; - ASSERT_EQ( - ArrowIpcDecoderDecodeDictionary(&decoder, &shared, NANOARROW_VALIDATION_LEVEL_FULL, - &dictionaries, &error), - ENOTSUP); + ASSERT_EQ(ArrowIpcDecoderDecodeDictionary( + &decoder, body, NANOARROW_VALIDATION_LEVEL_FULL, &dictionaries, &error), + ENOTSUP); ASSERT_STREQ(error.message, "Dictionary concatenation is not yet supported"); + // After all of this, we should be able to actually decode a RecordBatch + ASSERT_EQ(ArrowIpcDecoderSetSchemaWithDictionaries(&decoder, &schema, + &dictionary_encodings, &error), + NANOARROW_OK); + data.data.data = kDictionaryRecordBatch; + data.size_bytes = sizeof(kDictionaryRecordBatch); + ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), NANOARROW_OK); + data.data.as_uint8 += decoder.header_size_bytes; + data.size_bytes -= decoder.header_size_bytes; + + // Decode the entire batch and check the dictionary + struct ArrowArrayView* batch_view; + ASSERT_EQ(ArrowIpcDecoderDecodeArrayViewWithDictionaries( + &decoder, data, -1, &dictionaries, &batch_view, &error), + NANOARROW_OK) + << error.message; + + ASSERT_NE(batch_view->children[0]->dictionary, nullptr); + ASSERT_EQ(batch_view->children[0]->dictionary->length, 3); + ASSERT_EQ(ArrowArrayViewGetStringUnsafe(batch_view->children[0]->dictionary, 0), + "zero"_asv); + + // Decode the specific column and check the dictionary + struct ArrowArrayView* column_view; + ASSERT_EQ(ArrowIpcDecoderDecodeArrayViewWithDictionaries( + &decoder, data, 0, &dictionaries, &column_view, &error), + NANOARROW_OK) + << error.message; + + ASSERT_NE(column_view->dictionary, nullptr); + ASSERT_EQ(column_view->dictionary->length, 3); + ASSERT_EQ(ArrowArrayViewGetStringUnsafe(column_view->dictionary, 0), "zero"_asv); + + // Decode the array from the ArrowBufferView + struct ArrowArray batch; + ASSERT_EQ(ArrowIpcDecoderDecodeArrayWithDictionaries( + &decoder, data, -1, &dictionaries, &batch, + NANOARROW_VALIDATION_LEVEL_FULL, &error), + NANOARROW_OK) + << error.message; + ASSERT_NE(batch.children[0]->dictionary, nullptr); + ASSERT_EQ(batch.children[0]->dictionary->length, 3); + ArrowArrayRelease(&batch); + + // Decode the array from a shared buffer + struct ArrowBuffer record_batch_body; + ArrowBufferInit(&record_batch_body); + ASSERT_EQ(ArrowBufferAppendBufferView(&record_batch_body, data), NANOARROW_OK); + + struct ArrowIpcSharedBuffer record_batch_shared; + ASSERT_EQ(ArrowIpcSharedBufferInit(&record_batch_shared, &record_batch_body), + NANOARROW_OK); + + ASSERT_EQ(ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( + &decoder, &record_batch_shared, -1, &dictionaries, &batch, + NANOARROW_VALIDATION_LEVEL_FULL, &error), + NANOARROW_OK) + << error.message; + ASSERT_NE(batch.children[0]->dictionary, nullptr); + ASSERT_EQ(batch.children[0]->dictionary->length, 3); + ArrowArrayRelease(&batch); + ArrowIpcSharedBufferReset(&record_batch_shared); + ArrowArrayViewReset(&array_view); - ArrowIpcSharedBufferReset(&shared); ArrowIpcDictionariesReset(&dictionaries); ArrowIpcDictionaryEncodingsReset(&dictionary_encodings); ArrowSchemaRelease(&schema); @@ -1140,7 +1209,14 @@ std::string ArrowSchemaToString(const struct ArrowSchema* schema) { #if defined(NANOARROW_BUILD_TESTS_WITH_ARROW) TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowTypeRoundtrip) { if (GetParam()->id() == arrow::Type::DICTIONARY) { - GTEST_SKIP() << "Dictionary array decode is not yet supported"; + GTEST_SKIP() << "Dictionary array encode is not yet supported"; + } + + if (GetParam()->id() == arrow::Type::EXTENSION && + std::static_pointer_cast(GetParam())->storage_type()->id() == + arrow::Type::DICTIONARY) { + GTEST_SKIP() + << "nanoarrow encoder cannot yet encode extension types with dictionary storage"; } nanoarrow::UniqueSchema schema; @@ -1182,16 +1258,20 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowTypeRoundtrip) { #if defined(NANOARROW_BUILD_TESTS_WITH_ARROW) TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) { - if (GetParam()->id() == arrow::Type::DICTIONARY) { - GTEST_SKIP() << "Dictionary array decode is not yet supported"; + const std::shared_ptr& data_type = GetParam(); + + if (data_type->id() == arrow::Type::DICTIONARY && + std::static_pointer_cast(data_type)->value_type()->id() == + Type::EXTENSION) { + GTEST_SKIP() + << "Arrow C++ MakeEmpty() doesn't support dictionary with extension value types"; } - const std::shared_ptr& data_type = GetParam(); std::shared_ptr dummy_schema = arrow::schema({arrow::field("dummy_name", data_type)}); auto maybe_empty = arrow::RecordBatch::MakeEmpty(dummy_schema); - ASSERT_TRUE(maybe_empty.ok()); + ASSERT_TRUE(maybe_empty.ok()) << maybe_empty.status(); auto empty = maybe_empty.ValueUnsafe(); auto maybe_nulls_array = arrow::MakeArrayOfNull(data_type, 3); @@ -1206,10 +1286,24 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) { struct ArrowBufferView buffer_view; struct ArrowArray array; - // Initialize the decoder + // Initialize the schema ASSERT_TRUE(arrow::ExportSchema(*dummy_schema, &schema).ok()); + + // Initialize the dictionaries + struct ArrowIpcDictionaryEncodings dictionary_encodings; + struct ArrowIpcDictionaries dictionaries; + ArrowIpcDictionaryEncodingsInit(&dictionary_encodings); + ASSERT_EQ(ArrowIpcDictionaryEncodingsAppendSchema(&dictionary_encodings, &schema), + NANOARROW_OK); + ASSERT_EQ(ArrowIpcDictionariesInit(&dictionaries, &dictionary_encodings, nullptr), + NANOARROW_OK); + + // Initialize the decoder ArrowIpcDecoderInit(&decoder); - ASSERT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr), NANOARROW_OK); + ASSERT_EQ(ArrowIpcDecoderSetSchemaWithDictionaries(&decoder, &schema, + &dictionary_encodings, nullptr), + NANOARROW_OK); + ArrowIpcDictionaryEncodingsReset(&dictionary_encodings); // Check the empty array auto maybe_serialized = arrow::ipc::SerializeRecordBatch(*empty, options); @@ -1220,14 +1314,22 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) { ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr), NANOARROW_OK); buffer_view.data.as_uint8 += decoder.header_size_bytes; buffer_view.size_bytes -= decoder.header_size_bytes; - ASSERT_EQ(ArrowIpcDecoderDecodeArray(&decoder, buffer_view, -1, &array, - NANOARROW_VALIDATION_LEVEL_FULL, nullptr), + ASSERT_EQ(ArrowIpcDecoderDecodeArrayWithDictionaries( + &decoder, buffer_view, -1, &dictionaries, &array, + NANOARROW_VALIDATION_LEVEL_FULL, nullptr), NANOARROW_OK); auto maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema); ASSERT_TRUE(maybe_batch.ok()); EXPECT_EQ(maybe_batch.ValueUnsafe()->ToString(), empty->ToString()); - EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*empty)); + + // Arrow C++ MakeEmpty() loses the ordered=1 flag for dictionary types. + // https://github.com/apache/arrow/issues/49674 + // So for ordered dictionaries, we only check ToString() equality for empty batches. + if (data_type->id() != arrow::Type::DICTIONARY || + !std::static_pointer_cast(data_type)->ordered()) { + EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*empty)) << empty->ToString(); + } // Check the array with 3 null values maybe_serialized = arrow::ipc::SerializeRecordBatch(*nulls, options); @@ -1238,8 +1340,9 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) { ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr), NANOARROW_OK); buffer_view.data.as_uint8 += decoder.header_size_bytes; buffer_view.size_bytes -= decoder.header_size_bytes; - ASSERT_EQ(ArrowIpcDecoderDecodeArray(&decoder, buffer_view, -1, &array, - NANOARROW_VALIDATION_LEVEL_FULL, nullptr), + ASSERT_EQ(ArrowIpcDecoderDecodeArrayWithDictionaries( + &decoder, buffer_view, -1, &dictionaries, &array, + NANOARROW_VALIDATION_LEVEL_FULL, nullptr), NANOARROW_OK); maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema); @@ -1248,6 +1351,7 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) { EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*nulls)); ArrowSchemaRelease(&schema); + ArrowIpcDictionariesReset(&dictionaries); ArrowIpcDecoderReset(&decoder); } #endif @@ -1281,6 +1385,13 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowArrayRoundtrip) { GTEST_SKIP() << "nanoarrow encoder cannot yet encode dictionaries"; } + if (GetParam()->id() == arrow::Type::EXTENSION && + std::static_pointer_cast(GetParam())->storage_type()->id() == + arrow::Type::DICTIONARY) { + GTEST_SKIP() + << "nanoarrow encoder cannot yet encode extension types with dictionary storage"; + } + struct ArrowError error; nanoarrow::UniqueSchema schema; ASSERT_TRUE( @@ -1334,6 +1445,43 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowArrayRoundtrip) { } } +// Extension type with dictionary storage for testing +class DictExtensionType : public ExtensionType { + public: + explicit DictExtensionType() : ExtensionType(dictionary(int32(), utf8())) {} + + std::string extension_name() const override { return "dict-extension"; } + + bool ExtensionEquals(const ExtensionType& other) const override { + return other.extension_name() == extension_name(); + } + + std::shared_ptr MakeArray(std::shared_ptr data) const override { + return std::make_shared(data); + } + + Result> Deserialize( + std::shared_ptr storage_type, + const std::string& serialized) const override { + return std::make_shared(); + } + + std::string Serialize() const override { return ""; } +}; + +std::shared_ptr dict_extension() { + static bool registered = false; + auto type = std::make_shared(); + if (!registered) { + auto status = RegisterExtensionType(type); + if (!status.ok() && !status.IsKeyError()) { + status.Abort(); + } + registered = true; + } + return type; +} + INSTANTIATE_TEST_SUITE_P( NanoarrowIpcTest, ArrowTypeParameterizedTestFixture, ::testing::Values( @@ -1383,8 +1531,12 @@ INSTANTIATE_TEST_SUITE_P( arrow::dictionary(arrow::int32(), arrow::utf8(), true), // Extension type arrow::extension::uuid(), - // Dictionary-encoded extension - arrow::dictionary(arrow::int32(), arrow::extension::uuid()))); + // Extension type with dictionary as the storage type + dict_extension() + // Dictionary-encoded extension is not supported in IPC + // https://github.com/apache/arrow/issues/49704 + // arrow::dictionary(arrow::int32(), arrow::extension::uuid())) + )); class ArrowSchemaParameterizedTestFixture : public ::testing::TestWithParam> { @@ -1545,11 +1697,12 @@ INSTANTIATE_TEST_SUITE_P( // Dictionary with field metadata arrow::schema({arrow::field( "some_name", arrow::dictionary(arrow::int32(), arrow::utf8()), - arrow::KeyValueMetadata::Make({"key1", "key2"}, {"value1", "value2"}))}), - // Dictionary with field metadata - arrow::schema({arrow::field( - "some_name", arrow::dictionary(arrow::int32(), arrow::extension::uuid()), - arrow::KeyValueMetadata::Make({"key1", "key2"}, {"value1", "value2"}))}))); + arrow::KeyValueMetadata::Make({"key1", "key2"}, {"value1", "value2"}))}) + // Dictionary with extension storage and field metadata is not supported in IPC + // arrow::schema({arrow::field( + // "some_name", arrow::dictionary(arrow::int32(), arrow::extension::uuid()), + // arrow::KeyValueMetadata::Make({"key1", "key2"}, {"value1", "value2"}))}) + )); class ArrowTypeIdParameterizedTestFixture : public ::testing::TestWithParam { diff --git a/src/nanoarrow/ipc/files_test.cc b/src/nanoarrow/ipc/files_test.cc index 6a1d7c787..6b70a0c7f 100644 --- a/src/nanoarrow/ipc/files_test.cc +++ b/src/nanoarrow/ipc/files_test.cc @@ -24,6 +24,7 @@ #if defined(NANOARROW_BUILD_TESTS_WITH_ARROW) #include #include +#include #include #include #include @@ -51,10 +52,12 @@ using namespace arrow; // would read. class TestFile { public: - TestFile(std::string path, int expected_return_code, std::string expected_error_message) + TestFile(std::string path, int expected_return_code, std::string expected_error_message, + bool write_supported = true) : path_(path), expected_return_code_(expected_return_code), - expected_error_message_(expected_error_message) {} + expected_error_message_(expected_error_message), + write_supported_(write_supported) {} TestFile(std::string path) : TestFile(path, NANOARROW_OK, "") {} @@ -62,6 +65,10 @@ class TestFile { static TestFile OK(std::string path) { return TestFile(path); } + static TestFile ReadOnly(std::string path) { + return TestFile(path, NANOARROW_OK, "", false); + } + static TestFile Err(int code, std::string path, std::string message = "__any__") { return TestFile(path, code, message); } @@ -228,7 +235,8 @@ class TestFile { return ArrowIpcWriterWriteArrayView(writer.get(), nullptr, error); } - void TestEqualsArrowCpp(const std::string& dir_prefix) { + void TestEqualsArrowCpp(const std::string& dir_prefix, + bool check_write_roundtrip = true) { std::stringstream path_builder; path_builder << dir_prefix << "/" << path_; @@ -251,11 +259,15 @@ class TestFile { GTEST_FAIL() << MakeError(NANOARROW_OK, ""); } - // Write back to a buffer using nanoarrow + // Write back to a buffer using nanoarrow if supported. We do this here + // because we need to move the arrays into the comparison for the Arrow C++ + // read. nanoarrow::UniqueBuffer roundtripped; - ASSERT_EQ(WriteNanoarrowStream(schema, arrays, roundtripped.get(), &error), - NANOARROW_OK) - << error.message; + if (write_supported_) { + ASSERT_EQ(WriteNanoarrowStream(schema, arrays, roundtripped.get(), &error), + NANOARROW_OK) + << error.message; + } // Read the same file with Arrow C++ auto maybe_table_arrow = ReadTable(io::ReadableFile::Open(path_builder.str())); @@ -266,6 +278,11 @@ class TestFile { maybe_table_arrow.ValueUnsafe()); } + // For types that aren't supported by the writer yet + if (!write_supported_) { + return; + } + auto maybe_table_roundtripped = ReadTable(BufferInputStream(roundtripped.get())); { SCOPED_TRACE("Read the roundtripped buffer using Arrow C++"); @@ -348,6 +365,7 @@ class TestFile { // Use testing utility to compare nanoarrow::testing::TestingJSONComparison comparison; + comparison.set_compare_metadata_order(false); ASSERT_EQ(comparison.CompareArrayStream(ipc_stream.get(), json_stream.get(), &error), NANOARROW_OK) << error.message; @@ -378,6 +396,7 @@ class TestFile { std::string path_; int expected_return_code_; std::string expected_error_message_; + bool write_supported_; }; // For better testing output @@ -404,7 +423,13 @@ class TestEndianFileFixture : public ::testing::TestWithParam { TestFile test_file; }; +bool EnsureUuidIsNotRegistered() { + return arrow::UnregisterExtensionType("arrow.uuid").ok(); +} + TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileNativeEndian) { + EnsureUuidIsNotRegistered(); + std::stringstream dir_builder; ArrowError error; ArrowErrorInit(&error); @@ -422,6 +447,8 @@ TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileNativeEndian) { } TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileSwapEndian) { + EnsureUuidIsNotRegistered(); + std::stringstream dir_builder; ArrowError error; ArrowErrorInit(&error); @@ -439,6 +466,8 @@ TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileSwapEndian) { } TEST_P(TestEndianFileFixture, NanoarrowIpcTestFileCheckJSON) { + EnsureUuidIsNotRegistered(); + std::stringstream dir_builder; ArrowError error; ArrowErrorInit(&error); @@ -477,20 +506,10 @@ INSTANTIATE_TEST_SUITE_P( TestFile::OK("generated_primitive.stream"), TestFile::OK("generated_recursive_nested.stream"), TestFile::OK("generated_union.stream"), - - // Files with features that are not yet supported (Dictionary encoding) - TestFile::NotSupported( - "generated_dictionary_unsigned.stream", - "Found valid dictionary batch but dictionary encoding is not yet supported"), - TestFile::NotSupported( - "generated_dictionary.stream", - "Found valid dictionary batch but dictionary encoding is not yet supported"), - TestFile::NotSupported( - "generated_nested_dictionary.stream", - "Found valid dictionary batch but dictionary encoding is not yet supported"), - TestFile::NotSupported( - "generated_extension.stream", - "Found valid dictionary batch but dictionary encoding is not yet supported") + TestFile::ReadOnly("generated_dictionary_unsigned.stream"), + TestFile::ReadOnly("generated_dictionary.stream"), + TestFile::ReadOnly("generated_nested_dictionary.stream"), + TestFile::ReadOnly("generated_extension.stream") // Comment to keep last line from wrapping )); diff --git a/src/nanoarrow/ipc/reader.c b/src/nanoarrow/ipc/reader.c index c70b14483..897508241 100644 --- a/src/nanoarrow/ipc/reader.c +++ b/src/nanoarrow/ipc/reader.c @@ -191,6 +191,7 @@ struct ArrowIpcArrayStreamReaderPrivate { struct ArrowBuffer header; struct ArrowBuffer body; int32_t expected_header_prefix_size; + struct ArrowIpcDictionaries dictionaries; struct ArrowError error; }; @@ -211,13 +212,16 @@ static void ArrowIpcArrayStreamReaderRelease(struct ArrowArrayStream* stream) { ArrowBufferReset(&private_data->header); ArrowBufferReset(&private_data->body); + if (private_data->dictionaries.private_data != NULL) { + ArrowIpcDictionariesReset(&private_data->dictionaries); + } + ArrowFree(private_data); stream->release = NULL; } static int ArrowIpcArrayStreamReaderNextHeader( - struct ArrowIpcArrayStreamReaderPrivate* private_data, - enum ArrowIpcMessageType message_type) { + struct ArrowIpcArrayStreamReaderPrivate* private_data, int schema_expected) { private_data->header.size_bytes = 0; int64_t bytes_read = 0; @@ -332,7 +336,10 @@ static int ArrowIpcArrayStreamReaderNextHeader( // Don't decode the message if it's of the wrong type (because the error message // is better communicated by the caller) - if (private_data->decoder.message_type != message_type) { + if ((schema_expected && + private_data->decoder.message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA) || + (!schema_expected && + private_data->decoder.message_type == NANOARROW_IPC_MESSAGE_TYPE_SCHEMA)) { return NANOARROW_OK; } @@ -372,8 +379,7 @@ static int ArrowIpcArrayStreamReaderReadSchemaIfNeeded( return NANOARROW_OK; } - NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextHeader( - private_data, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA)); + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextHeader(private_data, 1)); // Error if this isn't a schema message if (private_data->decoder.message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA) { @@ -415,11 +421,21 @@ static int ArrowIpcArrayStreamReaderReadSchemaIfNeeded( return ENOTSUP; } + // Initialize dictionary decoders + int result = ArrowIpcDictionariesInit(&private_data->dictionaries, + &dictionary_encodings, &private_data->error); + if (result != NANOARROW_OK) { + ArrowIpcDictionaryEncodingsReset(&dictionary_encodings); + ArrowSchemaRelease(&tmp); + return result; + } + // Notify the decoder of the schema for forthcoming messages - int result = ArrowIpcDecoderSetSchemaWithDictionaries( + result = ArrowIpcDecoderSetSchemaWithDictionaries( &private_data->decoder, &tmp, &dictionary_encodings, &private_data->error); ArrowIpcDictionaryEncodingsReset(&dictionary_encodings); if (result != NANOARROW_OK) { + ArrowIpcDictionariesReset(&private_data->dictionaries); ArrowSchemaRelease(&tmp); return result; } @@ -437,19 +453,72 @@ static int ArrowIpcArrayStreamReaderGetSchema(struct ArrowArrayStream* stream, return ArrowSchemaDeepCopy(&private_data->out_schema, out); } -static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream, - struct ArrowArray* out) { - struct ArrowIpcArrayStreamReaderPrivate* private_data = - (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data; - ArrowErrorInit(&private_data->error); - NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data)); +static int ArrowIpcArrayStreamReaderProcessRecordBatch( + struct ArrowIpcArrayStreamReaderPrivate* private_data, struct ArrowArray* out) { + // Read in the body + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data)); + + if (private_data->use_shared_buffers) { + struct ArrowIpcSharedBuffer shared; + NANOARROW_RETURN_NOT_OK_WITH_ERROR( + ArrowIpcSharedBufferInit(&shared, &private_data->body), &private_data->error); + int result = ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( + &private_data->decoder, &shared, private_data->field_index, + &private_data->dictionaries, out, NANOARROW_VALIDATION_LEVEL_FULL, + &private_data->error); + ArrowIpcSharedBufferReset(&shared); + NANOARROW_RETURN_NOT_OK(result); + } else { + struct ArrowBufferView body_view; + body_view.data.data = private_data->body.data; + body_view.size_bytes = private_data->body.size_bytes; + + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayWithDictionaries( + &private_data->decoder, body_view, private_data->field_index, + &private_data->dictionaries, out, NANOARROW_VALIDATION_LEVEL_FULL, + &private_data->error)); + } + + return NANOARROW_OK; +} + +static int ArrowIpcArrayStreamReaderProcessDictionary( + struct ArrowIpcArrayStreamReaderPrivate* private_data) { + // Read in the body + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data)); + + if (private_data->use_shared_buffers) { + // Decode the dictionary + struct ArrowIpcSharedBuffer shared; + NANOARROW_RETURN_NOT_OK_WITH_ERROR( + ArrowIpcSharedBufferInit(&shared, &private_data->body), &private_data->error); + int result = ArrowIpcDecoderDecodeDictionaryFromShared( + &private_data->decoder, &shared, NANOARROW_VALIDATION_LEVEL_FULL, + &private_data->dictionaries, &private_data->error); + ArrowIpcSharedBufferReset(&shared); + NANOARROW_RETURN_NOT_OK(result); + } else { + struct ArrowBufferView body_view; + body_view.data.data = private_data->body.data; + body_view.size_bytes = private_data->body.size_bytes; + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeDictionary( + &private_data->decoder, body_view, NANOARROW_VALIDATION_LEVEL_FULL, + &private_data->dictionaries, &private_data->error)); + } + return NANOARROW_OK; +} + +static int ArrowIpcArrayStreamReaderProcessMessage( + struct ArrowIpcArrayStreamReaderPrivate* private_data, + enum ArrowIpcMessageType* message_type, struct ArrowArray* out) { // Read + decode the next header - int result = ArrowIpcArrayStreamReaderNextHeader( - private_data, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH); + int result = ArrowIpcArrayStreamReaderNextHeader(private_data, 0); if (result == ENODATA) { // Stream is finished either because there is no input or because - // end of stream bytes were read. + // end of stream bytes were read. Read this as a RecordBatch in the + // sense that we populate out->release to NULL and return OK. + *message_type = NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH; out->release = NULL; return NANOARROW_OK; } else if (result != NANOARROW_OK) { @@ -457,44 +526,43 @@ static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream, return result; } - // Make sure we have a RecordBatch message + // Make sure we have a RecordBatch message or DictionaryBatch message switch (private_data->decoder.message_type) { case NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH: - break; + *message_type = NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH; + return ArrowIpcArrayStreamReaderProcessRecordBatch(private_data, out); case NANOARROW_IPC_MESSAGE_TYPE_DICTIONARY_BATCH: - ArrowErrorSet( - &private_data->error, - "Found valid dictionary batch but dictionary encoding is not yet supported"); - return ENOTSUP; + *message_type = NANOARROW_IPC_MESSAGE_TYPE_DICTIONARY_BATCH; + return ArrowIpcArrayStreamReaderProcessDictionary(private_data); default: ArrowErrorSet(&private_data->error, - "Unexpected message type (expected RecordBatch)"); + "Unexpected message type (expected RecordBatch or DictionaryBatch)"); return EINVAL; } +} - // Read in the body - NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data)); +static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream, + struct ArrowArray* out) { + struct ArrowIpcArrayStreamReaderPrivate* private_data = + (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data; + ArrowErrorInit(&private_data->error); + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data)); + enum ArrowIpcMessageType message_type; struct ArrowArray tmp; + tmp.release = NULL; + + do { + int result = + ArrowIpcArrayStreamReaderProcessMessage(private_data, &message_type, &tmp); + if (result != NANOARROW_OK) { + if (tmp.release != NULL) { + ArrowArrayRelease(&tmp); + } - if (private_data->use_shared_buffers) { - struct ArrowIpcSharedBuffer shared; - NANOARROW_RETURN_NOT_OK_WITH_ERROR( - ArrowIpcSharedBufferInit(&shared, &private_data->body), &private_data->error); - result = ArrowIpcDecoderDecodeArrayFromShared( - &private_data->decoder, &shared, private_data->field_index, &tmp, - NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error); - ArrowIpcSharedBufferReset(&shared); - NANOARROW_RETURN_NOT_OK(result); - } else { - struct ArrowBufferView body_view; - body_view.data.data = private_data->body.data; - body_view.size_bytes = private_data->body.size_bytes; - - NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray( - &private_data->decoder, body_view, private_data->field_index, &tmp, - NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error)); - } + return result; + } + } while (message_type != NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH); ArrowArrayMove(&tmp, out); return NANOARROW_OK; @@ -528,6 +596,7 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit( private_data->out_schema.release = NULL; ArrowIpcInputStreamMove(input_stream, &private_data->input); private_data->expected_header_prefix_size = kExpectedHeaderPrefixSizeNotSet; + private_data->dictionaries.private_data = NULL; if (options != NULL) { private_data->field_index = options->field_index; diff --git a/src/nanoarrow/ipc/reader_test.cc b/src/nanoarrow/ipc/reader_test.cc index 8139503aa..d257e111f 100644 --- a/src/nanoarrow/ipc/reader_test.cc +++ b/src/nanoarrow/ipc/reader_test.cc @@ -57,6 +57,64 @@ static uint8_t kSimpleRecordBatch[] = { 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; +alignas(8) static uint8_t kDictionarySchema[] = { + 0xff, 0xff, 0xff, 0xff, 0x50, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, + 0x00, 0x01, 0x04, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x0c, 0x00, + 0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0xb0, 0x00, 0x00, 0x00, + 0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00, + 0x0c, 0x00, 0x04, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x8c, 0x00, 0x00, 0x00, + 0x04, 0x00, 0x00, 0x00, 0x7e, 0x00, 0x00, 0x00, 0x41, 0x0a, 0x33, 0x0a, 0x32, 0x36, + 0x33, 0x31, 0x37, 0x30, 0x0a, 0x31, 0x39, 0x37, 0x38, 0x38, 0x38, 0x0a, 0x35, 0x0a, + 0x55, 0x54, 0x46, 0x2d, 0x38, 0x0a, 0x35, 0x33, 0x31, 0x0a, 0x31, 0x0a, 0x35, 0x33, + 0x31, 0x0a, 0x31, 0x0a, 0x32, 0x35, 0x34, 0x0a, 0x31, 0x30, 0x32, 0x36, 0x0a, 0x31, + 0x0a, 0x32, 0x36, 0x32, 0x31, 0x35, 0x33, 0x0a, 0x35, 0x0a, 0x6e, 0x61, 0x6d, 0x65, + 0x73, 0x0a, 0x31, 0x36, 0x0a, 0x31, 0x0a, 0x32, 0x36, 0x32, 0x31, 0x35, 0x33, 0x0a, + 0x38, 0x0a, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x0a, 0x32, 0x35, 0x34, + 0x0a, 0x31, 0x30, 0x32, 0x36, 0x0a, 0x35, 0x31, 0x31, 0x0a, 0x31, 0x36, 0x0a, 0x31, + 0x0a, 0x32, 0x36, 0x32, 0x31, 0x35, 0x33, 0x0a, 0x37, 0x0a, 0x63, 0x6f, 0x6c, 0x75, + 0x6d, 0x6e, 0x73, 0x0a, 0x32, 0x35, 0x34, 0x0a, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x72, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x10, 0x00, + 0x18, 0x00, 0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x10, 0x00, 0x14, 0x00, + 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x05, 0x14, 0x00, 0x00, 0x00, 0x48, 0x00, + 0x00, 0x00, 0x24, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x08, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x00, 0x00, + 0x00, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x00, 0x00, + 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00, 0x08, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x00, 0x00, 0x04, 0x00, 0x04, 0x00, + 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + +alignas(8) static uint8_t kDictionaryBatch[] = { + 0xff, 0xff, 0xff, 0xff, 0xa8, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x0c, 0x00, 0x14, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0c, 0x00, + 0x0c, 0x00, 0x00, 0x00, 0x00, 0x02, 0x04, 0x00, 0x14, 0x00, 0x00, 0x00, 0x20, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x04, 0x00, + 0x08, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00, + 0x0c, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x4c, 0x00, 0x00, 0x00, + 0x10, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, + 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x7a, 0x65, 0x72, 0x6f, + 0x6f, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, +}; + +alignas(8) static uint8_t kDictionaryRecordBatch[] = { + 0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0c, 0x00, + 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00, 0x08, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00, 0x0c, 0x00, + 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 0x10, 0x00, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00}; + static uint8_t kEndOfStream[] = {0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00}; TEST(NanoarrowIpcReader, InputStreamBuffer) { @@ -335,7 +393,8 @@ TEST(NanoarrowIpcReader, StreamReaderExpectedRecordBatch) { struct ArrowArray array; struct ArrowError error; ASSERT_EQ(ArrowArrayStreamGetNext(&stream, &array, &error), EINVAL); - EXPECT_STREQ(error.message, "Unexpected message type (expected RecordBatch)"); + EXPECT_STREQ(error.message, + "Unexpected message type (expected RecordBatch or DictionaryBatch)"); ArrowArrayStreamRelease(&stream); } @@ -482,3 +541,105 @@ TEST(NanoarrowIpcReader, StreamReaderIncompletePrefix) { ArrowArrayStreamRelease(&stream); } + +TEST(NanoarrowIpcReader, StreamReaderDictionary) { + struct ArrowBuffer input_buffer; + ArrowBufferInit(&input_buffer); + ASSERT_EQ( + ArrowBufferAppend(&input_buffer, kDictionarySchema, sizeof(kDictionarySchema)), + NANOARROW_OK); + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kDictionaryBatch, sizeof(kDictionaryBatch)), + NANOARROW_OK); + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kDictionaryRecordBatch, + sizeof(kDictionaryRecordBatch)), + NANOARROW_OK); + + struct ArrowIpcInputStream input; + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + + struct ArrowArrayStream stream; + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK); + + struct ArrowSchema schema; + ASSERT_EQ(ArrowArrayStreamGetSchema(&stream, &schema, nullptr), NANOARROW_OK); + EXPECT_STREQ(schema.format, "+s"); + ASSERT_EQ(schema.n_children, 1); + // Dictionary-encoded field with int8 indices + EXPECT_STREQ(schema.children[0]->format, "c"); + ASSERT_NE(schema.children[0]->dictionary, nullptr); + // Dictionary values are utf8 strings + EXPECT_STREQ(schema.children[0]->dictionary->format, "u"); + ArrowSchemaRelease(&schema); + + struct ArrowArray array; + ASSERT_EQ(ArrowArrayStreamGetNext(&stream, &array, nullptr), NANOARROW_OK); + EXPECT_EQ(array.length, 3); + ASSERT_EQ(array.n_children, 1); + // The child should have a dictionary + ASSERT_NE(array.children[0]->dictionary, nullptr); + EXPECT_EQ(array.children[0]->dictionary->length, 3); + ArrowArrayRelease(&array); + + ASSERT_EQ(ArrowArrayStreamGetNext(&stream, &array, nullptr), NANOARROW_OK); + EXPECT_EQ(array.release, nullptr); + + ArrowArrayStreamRelease(&stream); +} + +TEST(NanoarrowIpcReader, StreamReaderDictionaryBatchWithoutDictionarySchema) { + // Send a dictionary batch when the schema has no dictionaries + struct ArrowBuffer input_buffer; + ArrowBufferInit(&input_buffer); + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, sizeof(kSimpleSchema)), + NANOARROW_OK); + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kDictionaryBatch, sizeof(kDictionaryBatch)), + NANOARROW_OK); + + struct ArrowIpcInputStream input; + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + + struct ArrowArrayStream stream; + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK); + + struct ArrowSchema schema; + ASSERT_EQ(ArrowArrayStreamGetSchema(&stream, &schema, nullptr), NANOARROW_OK); + ArrowSchemaRelease(&schema); + + struct ArrowArray array; + struct ArrowError error; + ASSERT_NE(ArrowArrayStreamGetNext(&stream, &array, &error), NANOARROW_OK); + ASSERT_GT(strlen(ArrowArrayStreamGetLastError(&stream)), 0); + + ArrowArrayStreamRelease(&stream); +} + +TEST(NanoarrowIpcReader, StreamReaderRecordBatchWithoutDictionaryBatch) { + // Send a record batch referencing a dictionary before the dictionary values arrive + struct ArrowBuffer input_buffer; + ArrowBufferInit(&input_buffer); + ASSERT_EQ( + ArrowBufferAppend(&input_buffer, kDictionarySchema, sizeof(kDictionarySchema)), + NANOARROW_OK); + // Skip the dictionary batch and go straight to the record batch + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kDictionaryRecordBatch, + sizeof(kDictionaryRecordBatch)), + NANOARROW_OK); + + struct ArrowIpcInputStream input; + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + + struct ArrowArrayStream stream; + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK); + + struct ArrowSchema schema; + ASSERT_EQ(ArrowArrayStreamGetSchema(&stream, &schema, nullptr), NANOARROW_OK); + ArrowSchemaRelease(&schema); + + struct ArrowArray array; + struct ArrowError error; + // Should error because dictionary values were never provided + ASSERT_EQ(ArrowArrayStreamGetNext(&stream, &array, &error), EINVAL); + ASSERT_GT(strlen(ArrowArrayStreamGetLastError(&stream)), 0); + + ArrowArrayStreamRelease(&stream); +} diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h index e2b114cd0..f4c8ef2cf 100644 --- a/src/nanoarrow/nanoarrow_ipc.h +++ b/src/nanoarrow/nanoarrow_ipc.h @@ -53,10 +53,17 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeSchemaWithDictionaries) #define ArrowIpcDecoderDecodeArrayView \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArrayView) +#define ArrowIpcDecoderDecodeArrayViewWithDictionaries \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArrayViewWithDictionaries) #define ArrowIpcDecoderDecodeArray \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArray) +#define ArrowIpcDecoderDecodeArrayWithDictionaries \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArrayWithDictionaries) #define ArrowIpcDecoderDecodeArrayFromShared \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArrayFromShared) +#define ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, \ + ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries) #define ArrowIpcDecoderSetSchema \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema) #define ArrowIpcDecoderSetSchemaWithDictionaries \ @@ -113,6 +120,8 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsInit) #define ArrowIpcDictionaryEncodingsAppend \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsAppend) +#define ArrowIpcDictionaryEncodingsAppendSchema \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsAppendSchema) #define ArrowIpcDictionaryEncodingsFind \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionaryEncodingsFind) #define ArrowIpcDictionaryEncodingsFindById \ @@ -129,6 +138,8 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDictionariesReset) #define ArrowIpcDecoderDecodeDictionary \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeDictionary) +#define ArrowIpcDecoderDecodeDictionaryFromShared \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeDictionaryFromShared) #endif @@ -243,6 +254,12 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDictionaryEncodingsAppend( struct ArrowIpcDictionaryEncodings* dictionary_encodings, struct ArrowIpcDictionaryEncoding encoding); +/// \brief Append all dictionaries in schema identified according to a depth-first +/// recursive search starting at 0 +NANOARROW_DLL ArrowErrorCode ArrowIpcDictionaryEncodingsAppendSchema( + struct ArrowIpcDictionaryEncodings* dictionary_encodings, + const struct ArrowSchema* schema); + /// \brief Resolve a ArrowIpcDictionaryEncoding for a given dictionary encoded field /// /// Returns NULL if the pointed to schema does not match any of the pointed to @@ -588,7 +605,7 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderSetSchemaWithDictionaries( NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderSetEndianness( struct ArrowIpcDecoder* decoder, enum ArrowIpcEndianness endianness); -/// \brief Decode an ArrowArrayView +/// \brief Decode an ArrowArrayView with dictionary decoding support /// /// After a successful call to ArrowIpcDecoderDecodeHeader(), deserialize the content /// of body into an internally-managed ArrowArrayView and return it. Note that field index @@ -600,11 +617,23 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderSetEndianness( /// will not perform any heap allocations; however, the buffers referred to by the /// returned ArrowArrayView are only valid as long as the buffer referred to by body stays /// valid. +NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayViewWithDictionaries( + struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i, + struct ArrowIpcDictionaries* dictionaries, struct ArrowArrayView** out, + struct ArrowError* error); + +/// \brief Decode an ArrowArrayView without dictionary decoding +/// +/// After a successful call to ArrowIpcDecoderDecodeHeader(), deserialize the content +/// of body into an internally-managed ArrowArrayView and return it. +/// +/// This is equivalent to ArrowIpcDecoderDecodeArrayViewWithDictionaries() passing +/// dictionaries as NULL. NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayView( struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i, struct ArrowArrayView** out, struct ArrowError* error); -/// \brief Decode an ArrowArray +/// \brief Decode an ArrowArray with dictionary decoding support /// /// After a successful call to ArrowIpcDecoderDecodeHeader(), assemble an ArrowArray given /// a message body and a field index. Note that field index does not equate to column @@ -615,24 +644,65 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayView( /// Returns EINVAL if the decoder did not just decode a record batch message, ENOTSUP /// if the message uses features not supported by this library, or or NANOARROW_OK /// otherwise. +NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayWithDictionaries( + struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i, + struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out, + enum ArrowValidationLevel validation_level, struct ArrowError* error); + +/// \brief Decode an ArrowArray without dictionary decoding support +/// +/// After a successful call to ArrowIpcDecoderDecodeHeader(), assemble an ArrowArray given +/// a message body and a field index. +/// +/// This is equivalent to calling ArrowIpcDecoderDecodeArrayWithDictionaries() passing +/// dictionaries as NULL. NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArray( struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error); -/// \brief Decode an ArrowArray from an owned buffer +/// \brief Decode an ArrowArray from an owned buffer with dictionary decoding support /// /// This implementation takes advantage of the fact that it can avoid copying individual /// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body after one or /// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If /// ArrowIpcSharedBufferIsThreadSafe() returns 0, out must not be released by another /// thread. +NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( + struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, int64_t i, + struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out, + enum ArrowValidationLevel validation_level, struct ArrowError* error); + +/// \brief Decode an ArrowArray from an owned buffer +/// +/// Equivalent to calling ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries() with +/// dictionaries as NULL. NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared( struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, int64_t i, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error); +/// \brief Decode an ArrowArray from a dictionary batch into the given +/// ArrowIpcDictionaries +/// +/// After a successful call to ArrowIpcDecoderDecodeHeader(), assemble an ArrowArray given +/// and place it into out for the decoding of future dictionaries. Note that other +/// dictionaries in out may be used during the decoding if there are nested dictionaries +/// in this stream. The decoded value may be obtained with +/// ArrowIpcDictionariesFindCurrentValue. NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary( + struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, + enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries* out, + struct ArrowError* error); + +/// \brief Decode an ArrowArray from a dictionary batch from an owned buffer +/// +/// This implementation takes advantage of the fact that it can avoid copying individual +/// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body after one or +/// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If +/// ArrowIpcSharedBufferIsThreadSafe() returns 0, no batches decoded using out may +/// be released from another thread. +NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionaryFromShared( struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries* out, struct ArrowError* error);