diff --git a/.github/workflows/build-and-test-ipc.yaml b/.github/workflows/build-and-test-ipc.yaml index 2ae83deaf..22df7e325 100644 --- a/.github/workflows/build-and-test-ipc.yaml +++ b/.github/workflows/build-and-test-ipc.yaml @@ -44,7 +44,7 @@ jobs: matrix: config: - {label: default-build, cmake_args: "-DNANOARROW_BUILD_APPS=ON -DNANOARROW_IPC_WITH_ZSTD=ON -DNANOARROW_IPC_WITH_LZ4=ON"} - - {label: default-noatomics, cmake_args: "-DCMAKE_C_FLAGS='-DNANOARROW_IPC_USE_STDATOMIC=0'"} + - {label: default-noatomics, cmake_args: "-DCMAKE_C_FLAGS='-DNANOARROW_USE_STDATOMIC=0'"} - {label: shared-test-linkage, cmake_args: "-DNANOARROW_TEST_LINKAGE_SHARED=ON"} - {label: namespaced-build, cmake_args: "-DNANOARROW_NAMESPACE=SomeUserNamespace"} - {label: bundled-build, cmake_args: "-DNANOARROW_BUNDLE=ON"} diff --git a/r/src/altrep.c b/r/src/altrep.c index 847951b1c..2d0be3967 100644 --- a/r/src/altrep.c +++ b/r/src/altrep.c @@ -168,17 +168,16 @@ SEXP nanoarrow_c_make_altrep_chr(SEXP array_xptr) { // Ensure the array that we're attaching to this ALTREP object does not keep its // parent struct alive unnecessarily (i.e., a user can select only a few columns // and the memory for the unused columns will be released). - SEXP array_xptr_independent = PROTECT(array_xptr_ensure_independent(array_xptr)); + array_xptr_ensure_independent(array_xptr); - if (nanoarrow_converter_set_array(converter_xptr, array_xptr_independent) != - NANOARROW_OK) { + if (nanoarrow_converter_set_array(converter_xptr, array_xptr) != NANOARROW_OK) { nanoarrow_converter_stop(converter_xptr); } Rf_setAttrib(converter_xptr, R_ClassSymbol, nanoarrow_cls_altrep_chr); SEXP out = PROTECT(R_new_altrep(nanoarrow_altrep_chr_cls, converter_xptr, R_NilValue)); MARK_NOT_MUTABLE(out); - UNPROTECT(3); + UNPROTECT(2); return out; } diff --git a/src/nanoarrow/common/array.c b/src/nanoarrow/common/array.c index ec7bdb667..7184c827a 100644 --- a/src/nanoarrow/common/array.c +++ b/src/nanoarrow/common/array.c @@ -73,7 +73,7 @@ static void ArrowArrayReleaseInternal(struct ArrowArray* array) { array->release = NULL; } -static int ArrowArrayIsInternal(struct ArrowArray* array) { +int ArrowArrayIsInternal(struct ArrowArray* array) { return array->release == &ArrowArrayReleaseInternal; } @@ -568,6 +568,162 @@ ArrowErrorCode ArrowArrayFinishBuildingDefault(struct ArrowArray* array, return ArrowArrayFinishBuilding(array, NANOARROW_VALIDATION_LEVEL_DEFAULT, error); } +static int ArrowArrayIsShared(struct ArrowArray* array) { + if (!ArrowArrayIsInternal(array)) { + return 0; + } + + for (int64_t i = 0; i < array->n_buffers; i++) { + struct ArrowBuffer* buffer = ArrowArrayBuffer(array, i); + if (buffer->data != NULL && !ArrowIsSharedBuffer(buffer)) { + return 0; + } + } + + for (int64_t i = 0; i < array->n_children; i++) { + if (!ArrowArrayIsShared(array->children[i])) { + return 0; + } + } + + if (array->dictionary != NULL && !ArrowArrayIsShared(array->dictionary)) { + return 0; + } + + return 1; +} + +static ArrowErrorCode ArrowArrayMoveSharedInternal(struct ArrowArray* src, + struct ArrowArray* dst) { + if (ArrowArrayIsShared(src)) { + ArrowArrayMove(src, dst); + return NANOARROW_OK; + } + + NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(dst, NANOARROW_TYPE_UNINITIALIZED)); + + // Allocate children and move source children to dst children + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(dst, src->n_children)); + for (int64_t i = 0; i < src->n_children; i++) { + NANOARROW_RETURN_NOT_OK( + ArrowArrayMoveSharedInternal(src->children[i], dst->children[i])); + } + + // Allocate dictionary if needed and move source dictionary to dst dictionary + if (src->dictionary != NULL) { + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateDictionary(dst)); + NANOARROW_RETURN_NOT_OK( + ArrowArrayMoveSharedInternal(src->dictionary, dst->dictionary)); + } + + // We might need some more buffers if we are shallowly copying a string/binary view + if (src->n_buffers > 3) { + if (src->n_buffers > INT_MAX) { + return EINVAL; + } + + NANOARROW_RETURN_NOT_OK( + ArrowArrayAddVariadicBuffers(dst, (int32_t)src->n_buffers - 3)); + } + + // Move src into a shared array and set dst's buffers using the ref-counted version + struct ArrowSharedArray shared_array; + NANOARROW_RETURN_NOT_OK(ArrowSharedArrayInit(&shared_array, src)); + + for (int64_t i = 0; i < src->n_buffers; i++) { + struct ArrowBuffer* dst_buffer = ArrowArrayBuffer(dst, i); + ArrowErrorCode result = ArrowSharedArrayBuffer(&shared_array, i, dst_buffer); + if (result != NANOARROW_OK) { + ArrowSharedArrayRelease(&shared_array); + return result; + } + } + + ArrowSharedArrayRelease(&shared_array); + + dst->n_buffers = src->n_buffers; + dst->length = src->length; + dst->null_count = src->null_count; + dst->offset = src->offset; + + // Flush internal buffer pointers to array->buffers + NANOARROW_RETURN_NOT_OK(ArrowArrayFlushInternalPointers(dst)); + + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowArrayCloneSharedInternal(struct ArrowArray* src, + struct ArrowArray* dst) { + NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(dst, NANOARROW_TYPE_UNINITIALIZED)); + + // Allocate children and clone source children to dst children + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(dst, src->n_children)); + for (int64_t i = 0; i < src->n_children; i++) { + NANOARROW_RETURN_NOT_OK( + ArrowArrayCloneSharedInternal(src->children[i], dst->children[i])); + } + + // Allocate dictionary if needed and clone source dictionary to dst dictionary + if (src->dictionary != NULL) { + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateDictionary(dst)); + NANOARROW_RETURN_NOT_OK( + ArrowArrayCloneSharedInternal(src->dictionary, dst->dictionary)); + } + + // We might need some more buffers if we are shallowly copying a string/binary view + if (src->n_buffers > 3) { + if (src->n_buffers > INT_MAX) { + return EINVAL; + } + + NANOARROW_RETURN_NOT_OK( + ArrowArrayAddVariadicBuffers(dst, (int32_t)src->n_buffers - 3)); + } + + for (int64_t i = 0; i < src->n_buffers; i++) { + struct ArrowBuffer* src_buffer = ArrowArrayBuffer(src, i); + struct ArrowBuffer* dst_buffer = ArrowArrayBuffer(dst, i); + NANOARROW_RETURN_NOT_OK(ArrowSharedBufferClone(src_buffer, dst_buffer)); + } + + dst->n_buffers = src->n_buffers; + dst->length = src->length; + dst->null_count = src->null_count; + dst->offset = src->offset; + + // Flush internal buffer pointers to array->buffers + NANOARROW_RETURN_NOT_OK(ArrowArrayFlushInternalPointers(dst)); + + return NANOARROW_OK; +} + +ArrowErrorCode ArrowArrayMoveShared(struct ArrowArray* array, struct ArrowArray* shared) { + struct ArrowArray tmp; + tmp.release = NULL; + ArrowErrorCode result = ArrowArrayMoveSharedInternal(array, shared); + if (result != NANOARROW_OK && tmp.release != NULL) { + ArrowArrayRelease(&tmp); + } + + return result; +} + +ArrowErrorCode ArrowArrayCloneShared(struct ArrowArray* shared, + struct ArrowArray* array) { + if (!ArrowArrayIsShared(shared)) { + return EINVAL; + } + + struct ArrowArray tmp; + tmp.release = NULL; + ArrowErrorCode result = ArrowArrayCloneSharedInternal(shared, array); + if (result != NANOARROW_OK && tmp.release != NULL) { + ArrowArrayRelease(&tmp); + } + + return result; +} + void ArrowArrayViewInitFromType(struct ArrowArrayView* array_view, enum ArrowType storage_type) { memset(array_view, 0, sizeof(struct ArrowArrayView)); diff --git a/src/nanoarrow/common/buffer_test.cc b/src/nanoarrow/common/buffer_test.cc index e294d8f1a..3b3cc8af6 100644 --- a/src/nanoarrow/common/buffer_test.cc +++ b/src/nanoarrow/common/buffer_test.cc @@ -17,7 +17,7 @@ #include #include -#include +#include #include @@ -704,3 +704,263 @@ TEST(BitmapTest, BitmapTestAppendInt32Unsafe) { ArrowBitmapReset(&bitmap); } + +TEST(SharedBufferTest, SharedBufferInitResetEmpty) { + struct ArrowBuffer src; + ArrowBufferInit(&src); + + struct ArrowBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); + EXPECT_EQ(shared.data, nullptr); + ArrowBufferReset(&shared); +} + +TEST(SharedBufferTest, SharedBufferInitReset) { + struct ArrowBuffer src; + ArrowBufferInit(&src); + ASSERT_EQ(ArrowBufferAppend(&src, "1234", 4), NANOARROW_OK); + + struct ArrowBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); + EXPECT_NE(shared.data, nullptr); + EXPECT_EQ(shared.size_bytes, 4); + EXPECT_EQ(memcmp(shared.data, "1234", 4), 0); + + ArrowBufferReset(&shared); + EXPECT_EQ(shared.data, nullptr); + EXPECT_EQ(shared.size_bytes, 0); +} + +TEST(SharedBufferTest, SharedBufferClone) { + struct ArrowBuffer src; + ArrowBufferInit(&src); + ASSERT_EQ(ArrowBufferAppend(&src, "abcdef", 6), NANOARROW_OK); + + struct ArrowBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); + + // Clone the shared buffer + struct ArrowBuffer clone; + ASSERT_EQ(ArrowSharedBufferClone(&shared, &clone), NANOARROW_OK); + EXPECT_EQ(clone.data, shared.data); + EXPECT_EQ(clone.size_bytes, 6); + EXPECT_EQ(memcmp(clone.data, "abcdef", 6), 0); + + // Release the original shared buffer; clone should still be valid + ArrowBufferReset(&shared); + EXPECT_EQ(memcmp(clone.data, "abcdef", 6), 0); + + ArrowBufferReset(&clone); +} + +TEST(SharedBufferTest, SharedBufferCloneEmpty) { + struct ArrowBuffer src; + ArrowBufferInit(&src); + + struct ArrowBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); + + struct ArrowBuffer clone; + ASSERT_EQ(ArrowSharedBufferClone(&shared, &clone), NANOARROW_OK); + EXPECT_EQ(clone.data, nullptr); + EXPECT_EQ(clone.size_bytes, 0); + + ArrowBufferReset(&shared); + ArrowBufferReset(&clone); +} + +TEST(SharedBufferTest, SharedBufferCloneNotShared) { + struct ArrowBuffer buf; + ArrowBufferInit(&buf); + ASSERT_EQ(ArrowBufferAppend(&buf, "abcdef", 6), NANOARROW_OK); + + struct ArrowBuffer clone; + EXPECT_EQ(ArrowSharedBufferClone(&buf, &clone), EINVAL); + + ArrowBufferReset(&buf); +} + +TEST(SharedBufferTest, SharedBufferThreadSafeClone) { + if (!ArrowSharedBufferIsThreadSafe()) { + GTEST_SKIP() << "ArrowSharedBufferIsThreadSafe() returned false"; + } + + struct ArrowBuffer src; + ArrowBufferInit(&src); + ASSERT_EQ(ArrowBufferAppend(&src, "abcdef", 6), NANOARROW_OK); + + struct ArrowBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); + + // Clone into multiple buffers + struct ArrowBuffer clones[10]; + for (int i = 0; i < 10; i++) { + ASSERT_EQ(ArrowSharedBufferClone(&shared, &clones[i]), NANOARROW_OK); + } + + // Release the original + ArrowBufferReset(&shared); + + // Release clones from separate threads + std::thread threads[10]; + for (int i = 0; i < 10; i++) { + threads[i] = std::thread([&clones, i] { + EXPECT_EQ(memcmp(clones[i].data, "abcdef", 6), 0); + ArrowBufferReset(&clones[i]); + }); + } + + for (int i = 0; i < 10; i++) { + threads[i].join(); + } +} + +// Helper to build a simple int32 array with known data for shared array tests +static void MakeInt32Array(struct ArrowArray* array, const int32_t* values, int64_t n) { + ASSERT_EQ(ArrowArrayInitFromType(array, NANOARROW_TYPE_INT32), NANOARROW_OK); + ASSERT_EQ(ArrowArrayStartAppending(array), NANOARROW_OK); + for (int64_t i = 0; i < n; i++) { + ASSERT_EQ(ArrowArrayAppendInt(array, values[i]), NANOARROW_OK); + } + ASSERT_EQ(ArrowArrayFinishBuildingDefault(array, nullptr), NANOARROW_OK); +} + +TEST(SharedArrayTest, SharedArrayInitRelease) { + int32_t values[] = {1, 2, 3, 4}; + struct ArrowArray src; + MakeInt32Array(&src, values, 4); + ASSERT_NE(src.release, nullptr); + + struct ArrowSharedArray shared; + ASSERT_EQ(ArrowSharedArrayInit(&shared, &src), NANOARROW_OK); + // src should have been moved + EXPECT_EQ(src.release, nullptr); + + ArrowSharedArrayRelease(&shared); + EXPECT_EQ(shared.private_data, nullptr); +} + +TEST(SharedArrayTest, SharedArrayReleaseNull) { + struct ArrowSharedArray shared; + shared.private_data = nullptr; + // Should not crash + ArrowSharedArrayRelease(&shared); +} + +TEST(SharedArrayTest, SharedArrayBuffer) { + int32_t values[] = {10, 20, 30}; + struct ArrowArray src; + MakeInt32Array(&src, values, 3); + + struct ArrowSharedArray shared; + ASSERT_EQ(ArrowSharedArrayInit(&shared, &src), NANOARROW_OK); + + // Buffer 1 is the data buffer for int32 + struct ArrowBuffer buf; + ASSERT_EQ(ArrowSharedArrayBuffer(&shared, 1, &buf), NANOARROW_OK); + EXPECT_NE(buf.data, nullptr); + // Internal array: size should be known + EXPECT_EQ(buf.size_bytes, 3 * sizeof(int32_t)); + const int32_t* data = reinterpret_cast(buf.data); + EXPECT_EQ(data[0], 10); + EXPECT_EQ(data[1], 20); + EXPECT_EQ(data[2], 30); + + // Release the shared array; buffer should still be valid + ArrowSharedArrayRelease(&shared); + EXPECT_EQ(reinterpret_cast(buf.data)[0], 10); + + ArrowBufferReset(&buf); +} + +TEST(SharedArrayTest, SharedArrayBufferClone) { + int32_t values[] = {100, 200}; + struct ArrowArray src; + MakeInt32Array(&src, values, 2); + + struct ArrowSharedArray shared; + ASSERT_EQ(ArrowSharedArrayInit(&shared, &src), NANOARROW_OK); + + struct ArrowBuffer buf; + ASSERT_EQ(ArrowSharedArrayBuffer(&shared, 1, &buf), NANOARROW_OK); + + // Clone the buffer obtained from the shared array + struct ArrowBuffer clone; + ASSERT_EQ(ArrowSharedBufferClone(&buf, &clone), NANOARROW_OK); + EXPECT_EQ(clone.data, buf.data); + EXPECT_EQ(clone.size_bytes, buf.size_bytes); + + // Release original shared array and buffer; clone should still be valid + ArrowSharedArrayRelease(&shared); + ArrowBufferReset(&buf); + const int32_t* data = reinterpret_cast(clone.data); + EXPECT_EQ(data[0], 100); + EXPECT_EQ(data[1], 200); + + ArrowBufferReset(&clone); +} + +TEST(SharedArrayTest, SharedArrayMultipleBuffers) { + int32_t values[] = {1, 2, 3, 4}; + struct ArrowArray src; + MakeInt32Array(&src, values, 4); + + struct ArrowSharedArray shared; + ASSERT_EQ(ArrowSharedArrayInit(&shared, &src), NANOARROW_OK); + + // Extract both buffers + struct ArrowBuffer buf0, buf1; + ASSERT_EQ(ArrowSharedArrayBuffer(&shared, 0, &buf0), NANOARROW_OK); + ASSERT_EQ(ArrowSharedArrayBuffer(&shared, 1, &buf1), NANOARROW_OK); + + // Release shared array; buffers should keep data alive + ArrowSharedArrayRelease(&shared); + + EXPECT_EQ(buf1.size_bytes, 4 * sizeof(int32_t)); + const int32_t* data = reinterpret_cast(buf1.data); + EXPECT_EQ(data[0], 1); + EXPECT_EQ(data[3], 4); + + // Release one buffer; remaining buffer should still be valid + ArrowBufferReset(&buf0); + EXPECT_EQ(reinterpret_cast(buf1.data)[2], 3); + + ArrowBufferReset(&buf1); +} + +TEST(SharedArrayTest, SharedArrayThreadSafeBuffer) { + if (!ArrowSharedBufferIsThreadSafe()) { + GTEST_SKIP() << "ArrowSharedBufferIsThreadSafe() returned false"; + } + + int32_t values[] = {42, 43, 44, 45}; + struct ArrowArray src; + MakeInt32Array(&src, values, 4); + + struct ArrowSharedArray shared; + ASSERT_EQ(ArrowSharedArrayInit(&shared, &src), NANOARROW_OK); + + // Extract multiple buffer clones + struct ArrowBuffer buffers[10]; + for (int i = 0; i < 10; i++) { + ASSERT_EQ(ArrowSharedArrayBuffer(&shared, 1, &buffers[i]), NANOARROW_OK); + } + + // Release the shared array + ArrowSharedArrayRelease(&shared); + + // Release buffers from separate threads + std::thread threads[10]; + for (int i = 0; i < 10; i++) { + threads[i] = std::thread([&buffers, i] { + const int32_t* data = reinterpret_cast(buffers[i].data); + EXPECT_EQ(data[0], 42); + EXPECT_EQ(data[3], 45); + ArrowBufferReset(&buffers[i]); + }); + } + + for (int i = 0; i < 10; i++) { + threads[i].join(); + } +} diff --git a/src/nanoarrow/common/utils.c b/src/nanoarrow/common/utils.c index 5b5116a12..976a509bf 100644 --- a/src/nanoarrow/common/utils.c +++ b/src/nanoarrow/common/utils.c @@ -22,6 +22,29 @@ #include #include +// For thread safe shared buffers we need C11 + stdatomic.h +// Can compile with -DNANOARROW_USE_STDATOMIC=0 or 1 to override +// automatic detection +#if !defined(NANOARROW_USE_STDATOMIC) +#define NANOARROW_USE_STDATOMIC 0 + +// Check for C11 +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L + +// Check for GCC 4.8, which doesn't include stdatomic.h but does +// not define __STDC_NO_ATOMICS__ +#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5 + +#if !defined(__STDC_NO_ATOMICS__) +#include +#undef NANOARROW_USE_STDATOMIC +#define NANOARROW_USE_STDATOMIC 1 +#endif +#endif +#endif + +#endif + #include "nanoarrow/nanoarrow.h" const char* ArrowNanoarrowVersion(void) { return NANOARROW_VERSION; } @@ -276,6 +299,224 @@ struct ArrowBufferAllocator ArrowBufferDeallocator( return allocator; } +#if NANOARROW_USE_STDATOMIC +struct ArrowSharedBufferPrivate { + struct ArrowBuffer src; + atomic_long reference_count; +}; + +static int64_t ArrowSharedBufferUpdate(struct ArrowSharedBufferPrivate* private_data, + int delta) { + int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta); + return old_count + delta; +} + +static void ArrowSharedBufferSet(struct ArrowSharedBufferPrivate* private_data, + int64_t count) { + atomic_store(&private_data->reference_count, count); +} + +int ArrowSharedBufferIsThreadSafe(void) { return 1; } + +struct ArrowSharedArrayPrivate { + struct ArrowArray src; + atomic_long reference_count; +}; + +static int64_t ArrowSharedArrayUpdate(struct ArrowSharedArrayPrivate* private_data, + int delta) { + int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta); + return old_count + delta; +} + +static void ArrowSharedArraySet(struct ArrowSharedArrayPrivate* private_data, + int64_t count) { + atomic_store(&private_data->reference_count, count); +} +#else +struct ArrowSharedBufferPrivate { + struct ArrowBuffer src; + int64_t reference_count; +}; + +static int64_t ArrowSharedBufferUpdate(struct ArrowSharedBufferPrivate* private_data, + int delta) { + private_data->reference_count += delta; + return private_data->reference_count; +} + +static void ArrowSharedBufferSet(struct ArrowSharedBufferPrivate* private_data, + int64_t count) { + private_data->reference_count = count; +} + +int ArrowSharedBufferIsThreadSafe(void) { return 0; } + +struct ArrowSharedArrayPrivate { + struct ArrowArray src; + int64_t reference_count; +}; + +static int64_t ArrowSharedArrayUpdate(struct ArrowSharedArrayPrivate* private_data, + int delta) { + private_data->reference_count += delta; + return private_data->reference_count; +} + +static void ArrowSharedArraySet(struct ArrowSharedArrayPrivate* private_data, + int64_t count) { + private_data->reference_count = count; +} +#endif + +static void ArrowSharedBufferFree(struct ArrowBufferAllocator* allocator, uint8_t* ptr, + int64_t size) { + NANOARROW_UNUSED(allocator); + NANOARROW_UNUSED(ptr); + NANOARROW_UNUSED(size); + + struct ArrowSharedBufferPrivate* private_data = + (struct ArrowSharedBufferPrivate*)allocator->private_data; + + if (ArrowSharedBufferUpdate(private_data, -1) == 0) { + ArrowBufferReset(&private_data->src); + ArrowFree(private_data); + } +} + +static void ArrowSharedArrayBufferFree(struct ArrowBufferAllocator* allocator, + uint8_t* ptr, int64_t size) { + NANOARROW_UNUSED(ptr); + NANOARROW_UNUSED(size); + + struct ArrowSharedArrayPrivate* private_data = + (struct ArrowSharedArrayPrivate*)allocator->private_data; + + if (ArrowSharedArrayUpdate(private_data, -1) == 0) { + if (private_data->src.release != NULL) { + ArrowArrayRelease(&private_data->src); + } + ArrowFree(private_data); + } +} + +ArrowErrorCode ArrowSharedBufferInit(struct ArrowBuffer* shared, + struct ArrowBuffer* src) { + if (src->data == NULL) { + ArrowBufferMove(src, shared); + return NANOARROW_OK; + } + + struct ArrowSharedBufferPrivate* private_data = + (struct ArrowSharedBufferPrivate*)ArrowMalloc( + sizeof(struct ArrowSharedBufferPrivate)); + if (private_data == NULL) { + return ENOMEM; + } + + ArrowBufferMove(src, &private_data->src); + ArrowSharedBufferSet(private_data, 1); + + ArrowBufferInit(shared); + shared->data = private_data->src.data; + shared->size_bytes = private_data->src.size_bytes; + // Don't expose any extra capcity from src so that any calls to ArrowBufferAppend + // on this buffer will fail. + shared->capacity_bytes = private_data->src.size_bytes; + shared->allocator = ArrowBufferDeallocator(&ArrowSharedBufferFree, private_data); + return NANOARROW_OK; +} + +int ArrowIsSharedBuffer(struct ArrowBuffer* buffer) { + return buffer->allocator.free == &ArrowSharedBufferFree || + buffer->allocator.free == &ArrowSharedArrayBufferFree; +} + +ArrowErrorCode ArrowSharedBufferClone(struct ArrowBuffer* shared, + struct ArrowBuffer* shared_out) { + if (shared->size_bytes == 0) { + ArrowBufferInit(shared_out); + return NANOARROW_OK; + } + + if (shared->allocator.free == &ArrowSharedBufferFree) { + struct ArrowSharedBufferPrivate* private_data = + (struct ArrowSharedBufferPrivate*)shared->allocator.private_data; + ArrowSharedBufferUpdate(private_data, 1); + memcpy(shared_out, shared, sizeof(struct ArrowBuffer)); + return NANOARROW_OK; + } + + if (shared->allocator.free == &ArrowSharedArrayBufferFree) { + struct ArrowSharedArrayPrivate* private_data = + (struct ArrowSharedArrayPrivate*)shared->allocator.private_data; + ArrowSharedArrayUpdate(private_data, 1); + memcpy(shared_out, shared, sizeof(struct ArrowBuffer)); + return NANOARROW_OK; + } + + return EINVAL; +} + +ArrowErrorCode ArrowSharedArrayInit(struct ArrowSharedArray* shared, + struct ArrowArray* src) { + struct ArrowSharedArrayPrivate* private_data = + (struct ArrowSharedArrayPrivate*)ArrowMalloc( + sizeof(struct ArrowSharedArrayPrivate)); + if (private_data == NULL) { + return ENOMEM; + } + + ArrowArrayMove(src, &private_data->src); + ArrowSharedArraySet(private_data, 1); + shared->private_data = private_data; + return NANOARROW_OK; +} + +void ArrowSharedArrayRelease(struct ArrowSharedArray* shared) { + if (shared->private_data == NULL) { + return; + } + + struct ArrowSharedArrayPrivate* private_data = + (struct ArrowSharedArrayPrivate*)shared->private_data; + + if (ArrowSharedArrayUpdate(private_data, -1) == 0) { + if (private_data->src.release != NULL) { + ArrowArrayRelease(&private_data->src); + } + ArrowFree(private_data); + } + + shared->private_data = NULL; +} + +ArrowErrorCode ArrowSharedArrayBuffer(struct ArrowSharedArray* shared, int64_t i, + struct ArrowBuffer* out) { + struct ArrowSharedArrayPrivate* private_data = + (struct ArrowSharedArrayPrivate*)shared->private_data; + NANOARROW_DCHECK(i >= 0 && i < private_data->src.n_buffers); + + ArrowSharedArrayUpdate(private_data, 1); + ArrowBufferInit(out); + + if (ArrowArrayIsInternal(&private_data->src)) { + // The source array was built with nanoarrow, so we can get buffer size info + struct ArrowBuffer* src = ArrowArrayBuffer(&private_data->src, i); + out->data = src->data; + out->size_bytes = src->size_bytes; + out->capacity_bytes = src->size_bytes; + } else { + // Generic C Data Interface array: buffer sizes are not known + out->data = (uint8_t*)private_data->src.buffers[i]; + out->size_bytes = 0; + out->capacity_bytes = 0; + } + + out->allocator = ArrowBufferDeallocator(&ArrowSharedArrayBufferFree, private_data); + return NANOARROW_OK; +} + static const int kInt32DecimalDigits = 9; static const uint64_t kUInt32PowersOfTen[] = { diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c index 4b0239c13..ef709b2a6 100644 --- a/src/nanoarrow/ipc/decoder.c +++ b/src/nanoarrow/ipc/decoder.c @@ -21,29 +21,6 @@ #include #include -// For thread safe shared buffers we need C11 + stdatomic.h -// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override -// automatic detection -#if !defined(NANOARROW_IPC_USE_STDATOMIC) -#define NANOARROW_IPC_USE_STDATOMIC 0 - -// Check for C11 -#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L - -// Check for GCC 4.8, which doesn't include stdatomic.h but does -// not define __STDC_NO_ATOMICS__ -#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5 - -#if !defined(__STDC_NO_ATOMICS__) -#include -#undef NANOARROW_IPC_USE_STDATOMIC -#define NANOARROW_IPC_USE_STDATOMIC 1 -#endif -#endif -#endif - -#endif - #include "nanoarrow/ipc/flatcc_generated.h" #include "nanoarrow/nanoarrow.h" #include "nanoarrow/nanoarrow_ipc.h" @@ -169,104 +146,6 @@ ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) { return NANOARROW_OK; } -#if NANOARROW_IPC_USE_STDATOMIC -struct ArrowIpcSharedBufferPrivate { - struct ArrowBuffer src; - atomic_long reference_count; -}; - -static int64_t ArrowIpcSharedBufferUpdate( - struct ArrowIpcSharedBufferPrivate* private_data, int delta) { - int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta); - return old_count + delta; -} - -static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate* private_data, - int64_t count) { - atomic_store(&private_data->reference_count, count); -} - -int ArrowIpcSharedBufferIsThreadSafe(void) { return 1; } -#else -struct ArrowIpcSharedBufferPrivate { - struct ArrowBuffer src; - int64_t reference_count; -}; - -static int64_t ArrowIpcSharedBufferUpdate( - struct ArrowIpcSharedBufferPrivate* private_data, int delta) { - private_data->reference_count += delta; - return private_data->reference_count; -} - -static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate* private_data, - int64_t count) { - private_data->reference_count = count; -} - -int ArrowIpcSharedBufferIsThreadSafe(void) { return 0; } -#endif - -static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator, uint8_t* ptr, - int64_t size) { - NANOARROW_UNUSED(allocator); - NANOARROW_UNUSED(ptr); - NANOARROW_UNUSED(size); - - struct ArrowIpcSharedBufferPrivate* private_data = - (struct ArrowIpcSharedBufferPrivate*)allocator->private_data; - - if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) { - ArrowBufferReset(&private_data->src); - ArrowFree(private_data); - } -} - -ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared, - struct ArrowBuffer* src) { - if (src->data == NULL) { - ArrowBufferMove(src, &shared->private_src); - return NANOARROW_OK; - } - - struct ArrowIpcSharedBufferPrivate* private_data = - (struct ArrowIpcSharedBufferPrivate*)ArrowMalloc( - sizeof(struct ArrowIpcSharedBufferPrivate)); - if (private_data == NULL) { - return ENOMEM; - } - - ArrowBufferMove(src, &private_data->src); - ArrowIpcSharedBufferSet(private_data, 1); - - ArrowBufferInit(&shared->private_src); - shared->private_src.data = private_data->src.data; - shared->private_src.size_bytes = private_data->src.size_bytes; - // Don't expose any extra capcity from src so that any calls to ArrowBufferAppend - // on this buffer will fail. - shared->private_src.capacity_bytes = private_data->src.size_bytes; - shared->private_src.allocator = - ArrowBufferDeallocator(&ArrowIpcSharedBufferFree, private_data); - return NANOARROW_OK; -} - -static void ArrowIpcSharedBufferClone(struct ArrowIpcSharedBuffer* shared, - struct ArrowBuffer* shared_out) { - if (shared->private_src.size_bytes == 0) { - ArrowBufferInit(shared_out); - return; - } - - struct ArrowIpcSharedBufferPrivate* private_data = - (struct ArrowIpcSharedBufferPrivate*)shared->private_src.allocator.private_data; - ArrowIpcSharedBufferUpdate(private_data, 1); - memcpy(shared_out, shared, sizeof(struct ArrowBuffer)); -} - -void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared) { - ArrowBufferReset(&shared->private_src); -} - static int ArrowIpcDecoderNeedsSwapEndian(struct ArrowIpcDecoder* decoder) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; @@ -2051,7 +1930,7 @@ struct ArrowIpcBufferSource { /// has been read into memory. This abstraction is currently internal and exists /// to support the two obvious ways a user might go about this: (1) using a /// non-owned view of memory that must be copied slice-wise or (2) adding a reference -/// to an ArrowIpcSharedBuffer and returning a slice of that memory. +/// to an ArrowBuffer (shared buffer) and returning a slice of that memory. struct ArrowIpcBufferFactory { /// \brief User-defined callback to populate a buffer view /// @@ -2177,14 +2056,13 @@ static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory* struct ArrowBufferView* dst_view, struct ArrowBuffer* dst, struct ArrowError* error) { - struct ArrowIpcSharedBuffer* shared = - (struct ArrowIpcSharedBuffer*)factory->private_data; + struct ArrowBuffer* shared = (struct ArrowBuffer*)factory->private_data; int needs_decompression = 0; int uncompressed_data_offset = 0; if (src->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) { struct ArrowBufferView src_view; - src_view.data.as_uint8 = shared->private_src.data + src->body_offset_bytes; + src_view.data.as_uint8 = shared->data + src->body_offset_bytes; src_view.size_bytes = src->buffer_length_bytes; NANOARROW_RETURN_NOT_OK(ArrowIpcDecompressBufferFromView( factory->decompressor, src->codec, src_view, dst, &needs_decompression, error)); @@ -2193,7 +2071,7 @@ static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory* if (!needs_decompression) { ArrowBufferReset(dst); - ArrowIpcSharedBufferClone(shared, dst); + NANOARROW_RETURN_NOT_OK(ArrowSharedBufferClone(shared, dst)); dst->data += src->body_offset_bytes + uncompressed_data_offset; dst->size_bytes = src->buffer_length_bytes - uncompressed_data_offset; } @@ -2204,11 +2082,11 @@ static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory* } static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromShared( - struct ArrowIpcSharedBuffer* shared) { + struct ArrowBuffer* shared) { struct ArrowIpcBufferFactory out; out.make_buffer = &ArrowIpcMakeBufferFromShared; out.decompressor = NULL; - out.buffer_length = shared->private_src.size_bytes; + out.buffer_length = shared->size_bytes; out.private_data = shared; return out; } @@ -2690,7 +2568,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder, } ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* body, int64_t i, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* body, int64_t i, struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error) { struct ArrowArrayView* array_view; @@ -2715,7 +2593,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( } ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* body, int64_t i, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* body, int64_t i, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error) { return ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries(decoder, body, i, NULL, out, @@ -2801,7 +2679,7 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary( } NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionaryFromShared( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* shared, enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries* dictionaries, struct ArrowError* error) { NANOARROW_DCHECK(decoder != NULL); diff --git a/src/nanoarrow/ipc/decoder_test.cc b/src/nanoarrow/ipc/decoder_test.cc index 48ed0af1f..f1c8164a2 100644 --- a/src/nanoarrow/ipc/decoder_test.cc +++ b/src/nanoarrow/ipc/decoder_test.cc @@ -853,8 +853,8 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatch) { ArrowBufferInit(&record_batch_body); ASSERT_EQ(ArrowBufferAppendBufferView(&record_batch_body, data), NANOARROW_OK); - struct ArrowIpcSharedBuffer record_batch_shared; - ASSERT_EQ(ArrowIpcSharedBufferInit(&record_batch_shared, &record_batch_body), + struct ArrowBuffer record_batch_shared; + ASSERT_EQ(ArrowSharedBufferInit(&record_batch_shared, &record_batch_body), NANOARROW_OK); ASSERT_EQ(ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( @@ -865,7 +865,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatch) { ASSERT_NE(batch.children[0]->dictionary, nullptr); ASSERT_EQ(batch.children[0]->dictionary->length, 3); ArrowArrayRelease(&batch); - ArrowIpcSharedBufferReset(&record_batch_shared); + ArrowBufferReset(&record_batch_shared); ArrowArrayViewReset(&array_view); ArrowIpcDictionariesReset(&dictionaries); @@ -970,8 +970,8 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) { decoder.body_size_bytes), NANOARROW_OK); - struct ArrowIpcSharedBuffer shared; - ASSERT_EQ(ArrowIpcSharedBufferInit(&shared, &body), NANOARROW_OK); + struct ArrowBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &body), NANOARROW_OK); // Check full struct extract EXPECT_EQ(ArrowIpcDecoderDecodeArrayFromShared( @@ -996,7 +996,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) { NANOARROW_OK); // Release the original shared (forthcoming array buffers should still be valid) - ArrowIpcSharedBufferReset(&shared); + ArrowBufferReset(&shared); ASSERT_EQ(array.n_buffers, 2); ASSERT_EQ(array.length, 3); @@ -1010,8 +1010,8 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) { } TEST(NanoarrowIpcTest, NanoarrowIpcSharedBufferThreadSafeDecode) { - if (!ArrowIpcSharedBufferIsThreadSafe()) { - GTEST_SKIP() << "ArrowIpcSharedBufferIsThreadSafe() returned false"; + if (!ArrowSharedBufferIsThreadSafe()) { + GTEST_SKIP() << "ArrowSharedBufferIsThreadSafe() returned false"; } struct ArrowIpcDecoder decoder; @@ -1039,8 +1039,8 @@ TEST(NanoarrowIpcTest, NanoarrowIpcSharedBufferThreadSafeDecode) { decoder.body_size_bytes), NANOARROW_OK); - struct ArrowIpcSharedBuffer shared; - ASSERT_EQ(ArrowIpcSharedBufferInit(&shared, &body), NANOARROW_OK); + struct ArrowBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &body), NANOARROW_OK); struct ArrowArray arrays[10]; for (int i = 0; i < 10; i++) { @@ -1051,7 +1051,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcSharedBufferThreadSafeDecode) { } // Clean up - ArrowIpcSharedBufferReset(&shared); + ArrowBufferReset(&shared); ArrowIpcDecoderReset(&decoder); ArrowSchemaRelease(&schema); diff --git a/src/nanoarrow/ipc/ipc_hpp_test.cc b/src/nanoarrow/ipc/ipc_hpp_test.cc index 0ff57dedf..393a0bd34 100644 --- a/src/nanoarrow/ipc/ipc_hpp_test.cc +++ b/src/nanoarrow/ipc/ipc_hpp_test.cc @@ -19,22 +19,6 @@ #include "nanoarrow/nanoarrow_ipc.hpp" -TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueSharedBuffer) { - nanoarrow::ipc::UniqueSharedBuffer shared_buffer; - - nanoarrow::UniqueBuffer buffer; - ASSERT_EQ(ArrowBufferAppend(buffer.get(), "1234", 4), NANOARROW_OK); - - EXPECT_EQ(shared_buffer->private_src.data, nullptr); - ASSERT_EQ(ArrowIpcSharedBufferInit(shared_buffer.get(), buffer.get()), NANOARROW_OK); - EXPECT_NE(shared_buffer->private_src.data, nullptr); - - nanoarrow::ipc::UniqueSharedBuffer shared_buffer2 = std::move(shared_buffer); - EXPECT_NE(shared_buffer2->private_src.data, nullptr); - EXPECT_EQ(shared_buffer->private_src.data, // NOLINT(clang-analyzer-cplusplus.Move) - nullptr); -} - TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueDecoder) { nanoarrow::ipc::UniqueDecoder decoder; diff --git a/src/nanoarrow/ipc/reader.c b/src/nanoarrow/ipc/reader.c index 897508241..f69d441f2 100644 --- a/src/nanoarrow/ipc/reader.c +++ b/src/nanoarrow/ipc/reader.c @@ -459,14 +459,14 @@ static int ArrowIpcArrayStreamReaderProcessRecordBatch( NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data)); if (private_data->use_shared_buffers) { - struct ArrowIpcSharedBuffer shared; + struct ArrowBuffer shared; NANOARROW_RETURN_NOT_OK_WITH_ERROR( - ArrowIpcSharedBufferInit(&shared, &private_data->body), &private_data->error); - int result = ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( + ArrowSharedBufferInit(&shared, &private_data->body), &private_data->error); + ArrowErrorCode result = ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( &private_data->decoder, &shared, private_data->field_index, &private_data->dictionaries, out, NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error); - ArrowIpcSharedBufferReset(&shared); + ArrowBufferReset(&shared); NANOARROW_RETURN_NOT_OK(result); } else { struct ArrowBufferView body_view; @@ -489,13 +489,13 @@ static int ArrowIpcArrayStreamReaderProcessDictionary( if (private_data->use_shared_buffers) { // Decode the dictionary - struct ArrowIpcSharedBuffer shared; + struct ArrowBuffer shared; NANOARROW_RETURN_NOT_OK_WITH_ERROR( - ArrowIpcSharedBufferInit(&shared, &private_data->body), &private_data->error); + ArrowSharedBufferInit(&shared, &private_data->body), &private_data->error); int result = ArrowIpcDecoderDecodeDictionaryFromShared( &private_data->decoder, &shared, NANOARROW_VALIDATION_LEVEL_FULL, &private_data->dictionaries, &private_data->error); - ArrowIpcSharedBufferReset(&shared); + ArrowBufferReset(&shared); NANOARROW_RETURN_NOT_OK(result); } else { struct ArrowBufferView body_view; @@ -603,7 +603,7 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit( private_data->use_shared_buffers = options->use_shared_buffers; } else { private_data->field_index = -1; - private_data->use_shared_buffers = ArrowIpcSharedBufferIsThreadSafe(); + private_data->use_shared_buffers = ArrowSharedBufferIsThreadSafe(); } out->private_data = private_data; diff --git a/src/nanoarrow/nanoarrow.h b/src/nanoarrow/nanoarrow.h index c22ef12e5..1ec13876d 100644 --- a/src/nanoarrow/nanoarrow.h +++ b/src/nanoarrow/nanoarrow.h @@ -45,6 +45,17 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBufferAllocatorDefault) #define ArrowBufferDeallocator \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBufferDeallocator) +#define ArrowSharedBufferInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferInit) +#define ArrowSharedBufferIsThreadSafe \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferIsThreadSafe) +#define ArrowSharedBufferClone \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferClone) +#define ArrowSharedArrayInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedArrayInit) +#define ArrowSharedArrayRelease \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedArrayRelease) +#define ArrowSharedArrayBuffer \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedArrayBuffer) +#define ArrowIsSharedBuffer NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIsSharedBuffer) #define ArrowErrorSet NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowErrorSet) #define ArrowLayoutInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowLayoutInit) #define ArrowDecimalSetDigits NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDecimalSetDigits) @@ -94,6 +105,7 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowMetadataBuilderRemove) #define ArrowSchemaViewInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSchemaViewInit) #define ArrowSchemaToString NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSchemaToString) +#define ArrowArrayIsInternal NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayIsInternal) #define ArrowArrayInitFromType \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayInitFromType) #define ArrowArrayInitFromSchema \ @@ -114,6 +126,8 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayFinishBuilding) #define ArrowArrayFinishBuildingDefault \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayFinishBuildingDefault) +#define ArrowArrayMoveShared NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayMoveShared) +#define ArrowArrayCloneShared NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayCloneShared) #define ArrowArrayViewInitFromType \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayViewInitFromType) #define ArrowArrayViewInitFromSchema \ @@ -203,6 +217,85 @@ NANOARROW_DLL struct ArrowBufferAllocator ArrowBufferDeallocator( /// @} +/// \defgroup nanoarrow-shared-buffer Shared buffers and arrays +/// +/// The nanoarrow library implements two kinds of shared buffers: those backed by +/// a reference-counted ArrowBuffer and those backed by a reference-counted ArrowArray. +/// These are useful for building ArrowArrays using buffers without copying the bytes +/// of the source. For example the IPC implementation uses shared buffers derived from an +/// ArrowBuffer to avoid copying buffers from an IPC message, and the Shared buffers +/// derived from arrays are used to make a safe shallow clone of an array. +/// +/// Reference counts are implemented using C11 atomics and not necessarily thread safe +/// on all platforms (notably, MSVC requires `/experimental:c11atomics`). Use +/// ArrowSharedBufferIsThreadSafe() to check for thread safety if this is needed. +/// +/// @{ + +/// \brief Initialize a shared buffer from an ArrowBuffer +/// +/// If NANOARROW_OK is returned, shared is a reference-counted buffer that +/// took ownership of src. The caller should release the shared buffer +/// using ArrowBufferReset() when finished. The underlying data will persist +/// until all clones have also been released. +NANOARROW_DLL ArrowErrorCode ArrowSharedBufferInit(struct ArrowBuffer* shared, + struct ArrowBuffer* src); + +/// \brief Check for shared buffer thread safety +/// +/// Thread-safe shared buffers require C11 and the stdatomic.h header. +/// If either are unavailable, shared buffers are still possible but +/// the resulting arrays must not be passed to other threads to be released. +NANOARROW_DLL int ArrowSharedBufferIsThreadSafe(void); + +/// \brief Check if a buffer is a shared buffer +/// +/// Returns non-zero if buffer was created by ArrowSharedBufferInit() or +/// obtained from ArrowSharedArrayBuffer(). +NANOARROW_DLL int ArrowIsSharedBuffer(struct ArrowBuffer* buffer); + +/// \brief Clone a shared buffer, incrementing its reference count +/// +/// This creates a new ArrowBuffer that shares the same underlying data with the +/// original shared buffer. The reference count is incremented. Returns EINVAL +/// if shared is not a shared buffer (i.e. ArrowIsSharedBuffer() returns false). +NANOARROW_DLL ArrowErrorCode ArrowSharedBufferClone(struct ArrowBuffer* shared, + struct ArrowBuffer* shared_out); + +/// \brief An opaque handle to a shared, reference-counted ArrowArray +struct ArrowSharedArray { + void* private_data; +}; + +/// \brief Initialize a shared array from an ArrowArray +/// +/// Takes ownership of src (sets src->release to NULL). The shared array +/// should be released with ArrowSharedArrayRelease() when all buffers have +/// been extracted from the source. The original array will be released +/// when all borrowed buffers have been released. +NANOARROW_DLL ArrowErrorCode ArrowSharedArrayInit(struct ArrowSharedArray* shared, + struct ArrowArray* src); + +/// \brief Release the caller's reference to a shared array +/// +/// Decrements the reference count. When no more references remain +/// (including any buffers obtained via ArrowSharedArrayBuffer()), +/// the underlying ArrowArray is released. +NANOARROW_DLL void ArrowSharedArrayRelease(struct ArrowSharedArray* shared); + +/// \brief Obtain a buffer from a shared array +/// +/// Returns an ArrowBuffer that views buffer i of the underlying ArrowArray. +/// If the source array was built with nanoarrow (i.e. ArrowArrayIsInternal() +/// returns true), buffer sizes are known and propagated to the output. +/// Otherwise the output buffer's size_bytes will be 0. The shared array's +/// reference count is incremented. The returned buffer is compatible with +/// ArrowSharedBufferClone(). +NANOARROW_DLL ArrowErrorCode ArrowSharedArrayBuffer(struct ArrowSharedArray* shared, + int64_t i, struct ArrowBuffer* out); + +/// @} + /// \brief Move the contents of an src ArrowSchema into dst and set src->release to NULL /// \ingroup nanoarrow-arrow-cdata static inline void ArrowSchemaMove(struct ArrowSchema* src, struct ArrowSchema* dst); @@ -1038,6 +1131,12 @@ static inline ArrowErrorCode ArrowArrayShrinkToFit(struct ArrowArray* array); NANOARROW_DLL ArrowErrorCode ArrowArrayFinishBuildingDefault(struct ArrowArray* array, struct ArrowError* error); +/// \brief Check if an ArrowArray was allocated by nanoarrow +/// +/// Returns non-zero if array was allocated using ArrowArrayInitFromType(), +/// ArrowArrayInitFromSchema(), or ArrowArrayInitFromArrayView(). +NANOARROW_DLL int ArrowArrayIsInternal(struct ArrowArray* array); + /// \brief Finish building an ArrowArray with explicit validation /// /// Finish building with an explicit validation level. This could perform less validation @@ -1049,6 +1148,25 @@ NANOARROW_DLL ArrowErrorCode ArrowArrayFinishBuilding( struct ArrowArray* array, enum ArrowValidationLevel validation_level, struct ArrowError* error); +/// \brief Create a shared copy of an ArrowArray +/// +/// Recursively converts array into a version where all buffers are +/// reference-counted. On success, shared is a new ArrowArray whose buffers +/// are backed by ArrowSharedArray references, and array is consumed +/// (release set to NULL). The resulting shared array can be safely moved +/// or have its buffers cloned via ArrowSharedBufferClone(). +NANOARROW_DLL ArrowErrorCode ArrowArrayMoveShared(struct ArrowArray* array, + struct ArrowArray* shared); + +/// \brief Clone a shared ArrowArray +/// +/// Creates a new ArrowArray whose buffers share the same underlying data as +/// the source shared array. The source must have been created by +/// ArrowArrayMoveShared() (i.e. all buffers are reference-counted). +/// Returns EINVAL if shared is not a shared array. +NANOARROW_DLL ArrowErrorCode ArrowArrayCloneShared(struct ArrowArray* shared, + struct ArrowArray* array); + /// @} /// \defgroup nanoarrow-array-view Reading arrays diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h index f4c8ef2cf..3632bd777 100644 --- a/src/nanoarrow/nanoarrow_ipc.h +++ b/src/nanoarrow/nanoarrow_ipc.h @@ -23,12 +23,8 @@ #ifdef NANOARROW_NAMESPACE #define ArrowIpcCheckRuntime NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcCheckRuntime) -#define ArrowIpcSharedBufferIsThreadSafe \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferIsThreadSafe) -#define ArrowIpcSharedBufferInit \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferInit) -#define ArrowIpcSharedBufferReset \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferReset) +#define ArrowIpcSharedBufferIsThreadSafe ArrowSharedBufferIsThreadSafe +#define ArrowIpcSharedBufferInit ArrowSharedBufferInit #define ArrowIpcGetZstdDecompressionFunction \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcGetZstdDecompressionFunction) #define ArrowIpcGetLZ4DecompressionFunction \ @@ -141,6 +137,12 @@ #define ArrowIpcDecoderDecodeDictionaryFromShared \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeDictionaryFromShared) +#else + +// Backward-compatible aliases (non-namespaced builds) +#define ArrowIpcSharedBufferIsThreadSafe ArrowSharedBufferIsThreadSafe +#define ArrowIpcSharedBufferInit ArrowSharedBufferInit + #endif #ifdef __cplusplus @@ -333,33 +335,6 @@ static inline enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) { } } -/// \brief A structure representing a reference-counted buffer that may be passed to -/// ArrowIpcDecoderDecodeArrayFromShared(). -struct ArrowIpcSharedBuffer { - struct ArrowBuffer private_src; -}; - -/// \brief Initialize the contents of a ArrowIpcSharedBuffer struct -/// -/// If NANOARROW_OK is returned, the ArrowIpcSharedBuffer takes ownership of -/// src. -NANOARROW_DLL ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared, - struct ArrowBuffer* src); - -/// \brief Release the caller's copy of the shared buffer -/// -/// When finished, the caller must relinquish its own copy of the shared data -/// using this function. The original buffer will continue to exist until all -/// ArrowArray objects that refer to it have also been released. -NANOARROW_DLL void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared); - -/// \brief Check for shared buffer thread safety -/// -/// Thread-safe shared buffers require C11 and the stdatomic.h header. -/// If either are unavailable, shared buffers are still possible but -/// the resulting arrays must not be passed to other threads to be released. -NANOARROW_DLL int ArrowIpcSharedBufferIsThreadSafe(void); - /// \brief A user-extensible decompressor /// /// The ArrowIpcDecompressor is the underlying object that enables decompression in the @@ -664,12 +639,12 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArray( /// \brief Decode an ArrowArray from an owned buffer with dictionary decoding support /// /// This implementation takes advantage of the fact that it can avoid copying individual -/// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body after one or +/// buffers. In all cases the caller must ArrowBufferReset() body after one or /// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If -/// ArrowIpcSharedBufferIsThreadSafe() returns 0, out must not be released by another +/// ArrowSharedBufferIsThreadSafe() returns 0, out must not be released by another /// thread. NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, int64_t i, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* shared, int64_t i, struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error); @@ -678,7 +653,7 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionarie /// Equivalent to calling ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries() with /// dictionaries as NULL. NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, int64_t i, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* shared, int64_t i, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error); @@ -698,12 +673,12 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary( /// \brief Decode an ArrowArray from a dictionary batch from an owned buffer /// /// This implementation takes advantage of the fact that it can avoid copying individual -/// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body after one or +/// buffers. In all cases the caller must ArrowBufferReset() body after one or /// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If -/// ArrowIpcSharedBufferIsThreadSafe() returns 0, no batches decoded using out may +/// ArrowSharedBufferIsThreadSafe() returns 0, no batches decoded using out may /// be released from another thread. NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionaryFromShared( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* shared, enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries* out, struct ArrowError* error); @@ -760,7 +735,7 @@ struct ArrowIpcArrayStreamReaderOptions { /// (since unreferenced portions of the file are often not loaded into memory) or /// (2) if all data from all columns are about to be referenced anyway. When loading /// a single field there is probably no advantage to using shared buffers. - /// Defaults to the value of ArrowIpcSharedBufferIsThreadSafe(). + /// Defaults to the value of ArrowSharedBufferIsThreadSafe(). int use_shared_buffers; }; diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp index 604a21471..e6b23924f 100644 --- a/src/nanoarrow/nanoarrow_ipc.hpp +++ b/src/nanoarrow/nanoarrow_ipc.hpp @@ -25,22 +25,6 @@ namespace nanoarrow { namespace internal { -template <> -inline void init_pointer(struct ArrowIpcSharedBuffer* data) { - init_pointer(&data->private_src); -} - -template <> -inline void move_pointer(struct ArrowIpcSharedBuffer* src, - struct ArrowIpcSharedBuffer* dst) { - move_pointer(&src->private_src, &dst->private_src); -} - -template <> -inline void release_pointer(struct ArrowIpcSharedBuffer* data) { - ArrowIpcSharedBufferReset(data); -} - template <> inline void init_pointer(struct ArrowIpcDecoder* data) { data->private_data = nullptr; @@ -213,9 +197,6 @@ namespace ipc { /// /// @{ -/// \brief Class wrapping a unique struct ArrowIpcSharedBuffer -using UniqueSharedBuffer = internal::Unique; - /// \brief Class wrapping a unique struct ArrowIpcDecoder using UniqueDecoder = internal::Unique;