From a915c4adf64d3897e499a9c230da5e7a273c12cb Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sun, 5 Apr 2026 21:04:30 -0500 Subject: [PATCH 01/10] move existing implementation to utils.c --- .github/workflows/build-and-test-ipc.yaml | 2 +- src/nanoarrow/common/utils.c | 121 +++++++++++++++++++++ src/nanoarrow/hpp/unique.hpp | 19 ++++ src/nanoarrow/ipc/decoder.c | 123 +--------------------- src/nanoarrow/nanoarrow.h | 51 +++++++++ src/nanoarrow/nanoarrow_ipc.h | 41 +++----- src/nanoarrow/nanoarrow_ipc.hpp | 19 +--- 7 files changed, 207 insertions(+), 169 deletions(-) diff --git a/.github/workflows/build-and-test-ipc.yaml b/.github/workflows/build-and-test-ipc.yaml index 2ae83deaf..22df7e325 100644 --- a/.github/workflows/build-and-test-ipc.yaml +++ b/.github/workflows/build-and-test-ipc.yaml @@ -44,7 +44,7 @@ jobs: matrix: config: - {label: default-build, cmake_args: "-DNANOARROW_BUILD_APPS=ON -DNANOARROW_IPC_WITH_ZSTD=ON -DNANOARROW_IPC_WITH_LZ4=ON"} - - {label: default-noatomics, cmake_args: "-DCMAKE_C_FLAGS='-DNANOARROW_IPC_USE_STDATOMIC=0'"} + - {label: default-noatomics, cmake_args: "-DCMAKE_C_FLAGS='-DNANOARROW_USE_STDATOMIC=0'"} - {label: shared-test-linkage, cmake_args: "-DNANOARROW_TEST_LINKAGE_SHARED=ON"} - {label: namespaced-build, cmake_args: "-DNANOARROW_NAMESPACE=SomeUserNamespace"} - {label: bundled-build, cmake_args: "-DNANOARROW_BUNDLE=ON"} diff --git a/src/nanoarrow/common/utils.c b/src/nanoarrow/common/utils.c index 5b5116a12..7b6e3fa92 100644 --- a/src/nanoarrow/common/utils.c +++ b/src/nanoarrow/common/utils.c @@ -22,6 +22,29 @@ #include #include +// For thread safe shared buffers we need C11 + stdatomic.h +// Can compile with -DNANOARROW_USE_STDATOMIC=0 or 1 to override +// automatic detection +#if !defined(NANOARROW_USE_STDATOMIC) +#define NANOARROW_USE_STDATOMIC 0 + +// Check for C11 +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L + +// Check for GCC 4.8, which doesn't include stdatomic.h but does +// not define __STDC_NO_ATOMICS__ +#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5 + +#if !defined(__STDC_NO_ATOMICS__) +#include +#undef NANOARROW_USE_STDATOMIC +#define NANOARROW_USE_STDATOMIC 1 +#endif +#endif +#endif + +#endif + #include "nanoarrow/nanoarrow.h" const char* ArrowNanoarrowVersion(void) { return NANOARROW_VERSION; } @@ -276,6 +299,104 @@ struct ArrowBufferAllocator ArrowBufferDeallocator( return allocator; } +#if NANOARROW_USE_STDATOMIC +struct ArrowSharedBufferPrivate { + struct ArrowBuffer src; + atomic_long reference_count; +}; + +static int64_t ArrowSharedBufferUpdate(struct ArrowSharedBufferPrivate* private_data, + int delta) { + int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta); + return old_count + delta; +} + +static void ArrowSharedBufferSet(struct ArrowSharedBufferPrivate* private_data, + int64_t count) { + atomic_store(&private_data->reference_count, count); +} + +int ArrowSharedBufferIsThreadSafe(void) { return 1; } +#else +struct ArrowSharedBufferPrivate { + struct ArrowBuffer src; + int64_t reference_count; +}; + +static int64_t ArrowSharedBufferUpdate(struct ArrowSharedBufferPrivate* private_data, + int delta) { + private_data->reference_count += delta; + return private_data->reference_count; +} + +static void ArrowSharedBufferSet(struct ArrowSharedBufferPrivate* private_data, + int64_t count) { + private_data->reference_count = count; +} + +int ArrowSharedBufferIsThreadSafe(void) { return 0; } +#endif + +static void ArrowSharedBufferFree(struct ArrowBufferAllocator* allocator, uint8_t* ptr, + int64_t size) { + NANOARROW_UNUSED(allocator); + NANOARROW_UNUSED(ptr); + NANOARROW_UNUSED(size); + + struct ArrowSharedBufferPrivate* private_data = + (struct ArrowSharedBufferPrivate*)allocator->private_data; + + if (ArrowSharedBufferUpdate(private_data, -1) == 0) { + ArrowBufferReset(&private_data->src); + ArrowFree(private_data); + } +} + +ArrowErrorCode ArrowSharedBufferInit(struct ArrowSharedBuffer* shared, + struct ArrowBuffer* src) { + if (src->data == NULL) { + ArrowBufferMove(src, &shared->private_src); + return NANOARROW_OK; + } + + struct ArrowSharedBufferPrivate* private_data = + (struct ArrowSharedBufferPrivate*)ArrowMalloc( + sizeof(struct ArrowSharedBufferPrivate)); + if (private_data == NULL) { + return ENOMEM; + } + + ArrowBufferMove(src, &private_data->src); + ArrowSharedBufferSet(private_data, 1); + + ArrowBufferInit(&shared->private_src); + shared->private_src.data = private_data->src.data; + shared->private_src.size_bytes = private_data->src.size_bytes; + // Don't expose any extra capcity from src so that any calls to ArrowBufferAppend + // on this buffer will fail. + shared->private_src.capacity_bytes = private_data->src.size_bytes; + shared->private_src.allocator = + ArrowBufferDeallocator(&ArrowSharedBufferFree, private_data); + return NANOARROW_OK; +} + +void ArrowSharedBufferClone(struct ArrowSharedBuffer* shared, + struct ArrowBuffer* shared_out) { + if (shared->private_src.size_bytes == 0) { + ArrowBufferInit(shared_out); + return; + } + + struct ArrowSharedBufferPrivate* private_data = + (struct ArrowSharedBufferPrivate*)shared->private_src.allocator.private_data; + ArrowSharedBufferUpdate(private_data, 1); + memcpy(shared_out, shared, sizeof(struct ArrowBuffer)); +} + +void ArrowSharedBufferReset(struct ArrowSharedBuffer* shared) { + ArrowBufferReset(&shared->private_src); +} + static const int kInt32DecimalDigits = 9; static const uint64_t kUInt32PowersOfTen[] = { diff --git a/src/nanoarrow/hpp/unique.hpp b/src/nanoarrow/hpp/unique.hpp index a67f4fcdd..7fb569f7d 100644 --- a/src/nanoarrow/hpp/unique.hpp +++ b/src/nanoarrow/hpp/unique.hpp @@ -135,6 +135,22 @@ inline void release_pointer(struct ArrowArrayView* data) { ArrowArrayViewReset(data); } +template <> +inline void init_pointer(struct ArrowSharedBuffer* data) { + init_pointer(&data->private_src); +} + +template <> +inline void move_pointer(struct ArrowSharedBuffer* src, + struct ArrowSharedBuffer* dst) { + move_pointer(&src->private_src, &dst->private_src); +} + +template <> +inline void release_pointer(struct ArrowSharedBuffer* data) { + ArrowSharedBufferReset(data); +} + /// \brief A unique_ptr-like base class for stack-allocatable objects /// \tparam T The object type template @@ -219,6 +235,9 @@ using UniqueBitmap = internal::Unique; /// \brief Class wrapping a unique struct ArrowArrayView using UniqueArrayView = internal::Unique; +/// \brief Class wrapping a unique struct ArrowSharedBuffer +using UniqueSharedBuffer = internal::Unique; + /// @} NANOARROW_CXX_NAMESPACE_END diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c index 4b0239c13..d9d5ed9f5 100644 --- a/src/nanoarrow/ipc/decoder.c +++ b/src/nanoarrow/ipc/decoder.c @@ -21,29 +21,6 @@ #include #include -// For thread safe shared buffers we need C11 + stdatomic.h -// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override -// automatic detection -#if !defined(NANOARROW_IPC_USE_STDATOMIC) -#define NANOARROW_IPC_USE_STDATOMIC 0 - -// Check for C11 -#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L - -// Check for GCC 4.8, which doesn't include stdatomic.h but does -// not define __STDC_NO_ATOMICS__ -#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5 - -#if !defined(__STDC_NO_ATOMICS__) -#include -#undef NANOARROW_IPC_USE_STDATOMIC -#define NANOARROW_IPC_USE_STDATOMIC 1 -#endif -#endif -#endif - -#endif - #include "nanoarrow/ipc/flatcc_generated.h" #include "nanoarrow/nanoarrow.h" #include "nanoarrow/nanoarrow_ipc.h" @@ -169,104 +146,6 @@ ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) { return NANOARROW_OK; } -#if NANOARROW_IPC_USE_STDATOMIC -struct ArrowIpcSharedBufferPrivate { - struct ArrowBuffer src; - atomic_long reference_count; -}; - -static int64_t ArrowIpcSharedBufferUpdate( - struct ArrowIpcSharedBufferPrivate* private_data, int delta) { - int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta); - return old_count + delta; -} - -static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate* private_data, - int64_t count) { - atomic_store(&private_data->reference_count, count); -} - -int ArrowIpcSharedBufferIsThreadSafe(void) { return 1; } -#else -struct ArrowIpcSharedBufferPrivate { - struct ArrowBuffer src; - int64_t reference_count; -}; - -static int64_t ArrowIpcSharedBufferUpdate( - struct ArrowIpcSharedBufferPrivate* private_data, int delta) { - private_data->reference_count += delta; - return private_data->reference_count; -} - -static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate* private_data, - int64_t count) { - private_data->reference_count = count; -} - -int ArrowIpcSharedBufferIsThreadSafe(void) { return 0; } -#endif - -static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator, uint8_t* ptr, - int64_t size) { - NANOARROW_UNUSED(allocator); - NANOARROW_UNUSED(ptr); - NANOARROW_UNUSED(size); - - struct ArrowIpcSharedBufferPrivate* private_data = - (struct ArrowIpcSharedBufferPrivate*)allocator->private_data; - - if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) { - ArrowBufferReset(&private_data->src); - ArrowFree(private_data); - } -} - -ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared, - struct ArrowBuffer* src) { - if (src->data == NULL) { - ArrowBufferMove(src, &shared->private_src); - return NANOARROW_OK; - } - - struct ArrowIpcSharedBufferPrivate* private_data = - (struct ArrowIpcSharedBufferPrivate*)ArrowMalloc( - sizeof(struct ArrowIpcSharedBufferPrivate)); - if (private_data == NULL) { - return ENOMEM; - } - - ArrowBufferMove(src, &private_data->src); - ArrowIpcSharedBufferSet(private_data, 1); - - ArrowBufferInit(&shared->private_src); - shared->private_src.data = private_data->src.data; - shared->private_src.size_bytes = private_data->src.size_bytes; - // Don't expose any extra capcity from src so that any calls to ArrowBufferAppend - // on this buffer will fail. - shared->private_src.capacity_bytes = private_data->src.size_bytes; - shared->private_src.allocator = - ArrowBufferDeallocator(&ArrowIpcSharedBufferFree, private_data); - return NANOARROW_OK; -} - -static void ArrowIpcSharedBufferClone(struct ArrowIpcSharedBuffer* shared, - struct ArrowBuffer* shared_out) { - if (shared->private_src.size_bytes == 0) { - ArrowBufferInit(shared_out); - return; - } - - struct ArrowIpcSharedBufferPrivate* private_data = - (struct ArrowIpcSharedBufferPrivate*)shared->private_src.allocator.private_data; - ArrowIpcSharedBufferUpdate(private_data, 1); - memcpy(shared_out, shared, sizeof(struct ArrowBuffer)); -} - -void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared) { - ArrowBufferReset(&shared->private_src); -} - static int ArrowIpcDecoderNeedsSwapEndian(struct ArrowIpcDecoder* decoder) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; @@ -2193,7 +2072,7 @@ static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory* if (!needs_decompression) { ArrowBufferReset(dst); - ArrowIpcSharedBufferClone(shared, dst); + ArrowSharedBufferClone(shared, dst); dst->data += src->body_offset_bytes + uncompressed_data_offset; dst->size_bytes = src->buffer_length_bytes - uncompressed_data_offset; } diff --git a/src/nanoarrow/nanoarrow.h b/src/nanoarrow/nanoarrow.h index c22ef12e5..255573cc1 100644 --- a/src/nanoarrow/nanoarrow.h +++ b/src/nanoarrow/nanoarrow.h @@ -45,6 +45,14 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBufferAllocatorDefault) #define ArrowBufferDeallocator \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBufferDeallocator) +#define ArrowSharedBufferInit \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferInit) +#define ArrowSharedBufferReset \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferReset) +#define ArrowSharedBufferIsThreadSafe \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferIsThreadSafe) +#define ArrowSharedBufferClone \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferClone) #define ArrowErrorSet NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowErrorSet) #define ArrowLayoutInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowLayoutInit) #define ArrowDecimalSetDigits NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDecimalSetDigits) @@ -203,6 +211,49 @@ NANOARROW_DLL struct ArrowBufferAllocator ArrowBufferDeallocator( /// @} +/// \defgroup nanoarrow-shared-buffer Shared buffers +/// +/// A shared buffer is a reference-counted ArrowBuffer that may be shared +/// across multiple ArrowArray objects. This avoids copying when decoding +/// arrays from a single source buffer. +/// +/// @{ + +/// \brief A structure representing a reference-counted buffer +struct ArrowSharedBuffer { + struct ArrowBuffer private_src; +}; + +/// \brief Initialize the contents of an ArrowSharedBuffer struct +/// +/// If NANOARROW_OK is returned, the ArrowSharedBuffer takes ownership of +/// src. +NANOARROW_DLL ArrowErrorCode ArrowSharedBufferInit(struct ArrowSharedBuffer* shared, + struct ArrowBuffer* src); + +/// \brief Release the caller's copy of the shared buffer +/// +/// When finished, the caller must relinquish its own copy of the shared data +/// using this function. The original buffer will continue to exist until all +/// ArrowArray objects that refer to it have also been released. +NANOARROW_DLL void ArrowSharedBufferReset(struct ArrowSharedBuffer* shared); + +/// \brief Check for shared buffer thread safety +/// +/// Thread-safe shared buffers require C11 and the stdatomic.h header. +/// If either are unavailable, shared buffers are still possible but +/// the resulting arrays must not be passed to other threads to be released. +NANOARROW_DLL int ArrowSharedBufferIsThreadSafe(void); + +/// \brief Clone a shared buffer, incrementing its reference count +/// +/// This creates a new ArrowBuffer that shares the same underlying data with the +/// original shared buffer. The reference count is incremented. +NANOARROW_DLL void ArrowSharedBufferClone(struct ArrowSharedBuffer* shared, + struct ArrowBuffer* shared_out); + +/// @} + /// \brief Move the contents of an src ArrowSchema into dst and set src->release to NULL /// \ingroup nanoarrow-arrow-cdata static inline void ArrowSchemaMove(struct ArrowSchema* src, struct ArrowSchema* dst); diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h index f4c8ef2cf..4b3036150 100644 --- a/src/nanoarrow/nanoarrow_ipc.h +++ b/src/nanoarrow/nanoarrow_ipc.h @@ -23,12 +23,9 @@ #ifdef NANOARROW_NAMESPACE #define ArrowIpcCheckRuntime NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcCheckRuntime) -#define ArrowIpcSharedBufferIsThreadSafe \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferIsThreadSafe) -#define ArrowIpcSharedBufferInit \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferInit) -#define ArrowIpcSharedBufferReset \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferReset) +#define ArrowIpcSharedBufferIsThreadSafe ArrowSharedBufferIsThreadSafe +#define ArrowIpcSharedBufferInit ArrowSharedBufferInit +#define ArrowIpcSharedBufferReset ArrowSharedBufferReset #define ArrowIpcGetZstdDecompressionFunction \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcGetZstdDecompressionFunction) #define ArrowIpcGetLZ4DecompressionFunction \ @@ -141,6 +138,13 @@ #define ArrowIpcDecoderDecodeDictionaryFromShared \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeDictionaryFromShared) +#else + +// Backward-compatible aliases (non-namespaced builds) +#define ArrowIpcSharedBufferIsThreadSafe ArrowSharedBufferIsThreadSafe +#define ArrowIpcSharedBufferInit ArrowSharedBufferInit +#define ArrowIpcSharedBufferReset ArrowSharedBufferReset + #endif #ifdef __cplusplus @@ -335,30 +339,9 @@ static inline enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) { /// \brief A structure representing a reference-counted buffer that may be passed to /// ArrowIpcDecoderDecodeArrayFromShared(). -struct ArrowIpcSharedBuffer { - struct ArrowBuffer private_src; -}; - -/// \brief Initialize the contents of a ArrowIpcSharedBuffer struct -/// -/// If NANOARROW_OK is returned, the ArrowIpcSharedBuffer takes ownership of -/// src. -NANOARROW_DLL ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared, - struct ArrowBuffer* src); - -/// \brief Release the caller's copy of the shared buffer -/// -/// When finished, the caller must relinquish its own copy of the shared data -/// using this function. The original buffer will continue to exist until all -/// ArrowArray objects that refer to it have also been released. -NANOARROW_DLL void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared); - -/// \brief Check for shared buffer thread safety /// -/// Thread-safe shared buffers require C11 and the stdatomic.h header. -/// If either are unavailable, shared buffers are still possible but -/// the resulting arrays must not be passed to other threads to be released. -NANOARROW_DLL int ArrowIpcSharedBufferIsThreadSafe(void); +/// \deprecated Use ArrowSharedBuffer instead. +#define ArrowIpcSharedBuffer ArrowSharedBuffer /// \brief A user-extensible decompressor /// diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp index 604a21471..0b231cd4f 100644 --- a/src/nanoarrow/nanoarrow_ipc.hpp +++ b/src/nanoarrow/nanoarrow_ipc.hpp @@ -25,22 +25,6 @@ namespace nanoarrow { namespace internal { -template <> -inline void init_pointer(struct ArrowIpcSharedBuffer* data) { - init_pointer(&data->private_src); -} - -template <> -inline void move_pointer(struct ArrowIpcSharedBuffer* src, - struct ArrowIpcSharedBuffer* dst) { - move_pointer(&src->private_src, &dst->private_src); -} - -template <> -inline void release_pointer(struct ArrowIpcSharedBuffer* data) { - ArrowIpcSharedBufferReset(data); -} - template <> inline void init_pointer(struct ArrowIpcDecoder* data) { data->private_data = nullptr; @@ -213,7 +197,8 @@ namespace ipc { /// /// @{ -/// \brief Class wrapping a unique struct ArrowIpcSharedBuffer +/// \brief Class wrapping a unique struct ArrowSharedBuffer +/// \deprecated Use nanoarrow::UniqueSharedBuffer instead. using UniqueSharedBuffer = internal::Unique; /// \brief Class wrapping a unique struct ArrowIpcDecoder From 6b08f687089c2a3c4468dfaba5e34187145f4df4 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sun, 5 Apr 2026 21:10:46 -0500 Subject: [PATCH 02/10] add standalone tests --- src/nanoarrow/common/buffer_test.cc | 100 ++++++++++++++++++++++++++++ src/nanoarrow/hpp/unique_test.cc | 16 +++++ 2 files changed, 116 insertions(+) diff --git a/src/nanoarrow/common/buffer_test.cc b/src/nanoarrow/common/buffer_test.cc index e294d8f1a..1096faaca 100644 --- a/src/nanoarrow/common/buffer_test.cc +++ b/src/nanoarrow/common/buffer_test.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include @@ -704,3 +705,102 @@ TEST(BitmapTest, BitmapTestAppendInt32Unsafe) { ArrowBitmapReset(&bitmap); } + +TEST(SharedBufferTest, SharedBufferInitResetEmpty) { + struct ArrowBuffer src; + ArrowBufferInit(&src); + + struct ArrowSharedBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); + EXPECT_EQ(shared.private_src.data, nullptr); + ArrowSharedBufferReset(&shared); +} + +TEST(SharedBufferTest, SharedBufferInitReset) { + struct ArrowBuffer src; + ArrowBufferInit(&src); + ASSERT_EQ(ArrowBufferAppend(&src, "1234", 4), NANOARROW_OK); + + struct ArrowSharedBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); + EXPECT_NE(shared.private_src.data, nullptr); + EXPECT_EQ(shared.private_src.size_bytes, 4); + EXPECT_EQ(memcmp(shared.private_src.data, "1234", 4), 0); + + ArrowSharedBufferReset(&shared); + EXPECT_EQ(shared.private_src.data, nullptr); + EXPECT_EQ(shared.private_src.size_bytes, 0); +} + +TEST(SharedBufferTest, SharedBufferClone) { + struct ArrowBuffer src; + ArrowBufferInit(&src); + ASSERT_EQ(ArrowBufferAppend(&src, "abcdef", 6), NANOARROW_OK); + + struct ArrowSharedBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); + + // Clone the shared buffer + struct ArrowBuffer clone; + ArrowSharedBufferClone(&shared, &clone); + EXPECT_EQ(clone.data, shared.private_src.data); + EXPECT_EQ(clone.size_bytes, 6); + EXPECT_EQ(memcmp(clone.data, "abcdef", 6), 0); + + // Release the original shared buffer; clone should still be valid + ArrowSharedBufferReset(&shared); + EXPECT_EQ(memcmp(clone.data, "abcdef", 6), 0); + + ArrowBufferReset(&clone); +} + +TEST(SharedBufferTest, SharedBufferCloneEmpty) { + struct ArrowBuffer src; + ArrowBufferInit(&src); + + struct ArrowSharedBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); + + struct ArrowBuffer clone; + ArrowSharedBufferClone(&shared, &clone); + EXPECT_EQ(clone.data, nullptr); + EXPECT_EQ(clone.size_bytes, 0); + + ArrowSharedBufferReset(&shared); + ArrowBufferReset(&clone); +} + +TEST(SharedBufferTest, SharedBufferThreadSafeClone) { + if (!ArrowSharedBufferIsThreadSafe()) { + GTEST_SKIP() << "ArrowSharedBufferIsThreadSafe() returned false"; + } + + struct ArrowBuffer src; + ArrowBufferInit(&src); + ASSERT_EQ(ArrowBufferAppend(&src, "abcdef", 6), NANOARROW_OK); + + struct ArrowSharedBuffer shared; + ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); + + // Clone into multiple buffers + struct ArrowBuffer clones[10]; + for (int i = 0; i < 10; i++) { + ArrowSharedBufferClone(&shared, &clones[i]); + } + + // Release the original + ArrowSharedBufferReset(&shared); + + // Release clones from separate threads + std::thread threads[10]; + for (int i = 0; i < 10; i++) { + threads[i] = std::thread([&clones, i] { + EXPECT_EQ(memcmp(clones[i].data, "abcdef", 6), 0); + ArrowBufferReset(&clones[i]); + }); + } + + for (int i = 0; i < 10; i++) { + threads[i].join(); + } +} diff --git a/src/nanoarrow/hpp/unique_test.cc b/src/nanoarrow/hpp/unique_test.cc index d9d25cacf..9b1b77310 100644 --- a/src/nanoarrow/hpp/unique_test.cc +++ b/src/nanoarrow/hpp/unique_test.cc @@ -180,3 +180,19 @@ TEST(HppUnique, UniqueArrayView) { EXPECT_EQ(array_view2->storage_type, NANOARROW_TYPE_UNINITIALIZED); EXPECT_EQ(array_view3->storage_type, NANOARROW_TYPE_STRUCT); } + +TEST(HppUnique, UniqueSharedBuffer) { + nanoarrow::UniqueSharedBuffer shared_buffer; + + nanoarrow::UniqueBuffer buffer; + ASSERT_EQ(ArrowBufferAppend(buffer.get(), "1234", 4), NANOARROW_OK); + + EXPECT_EQ(shared_buffer->private_src.data, nullptr); + ASSERT_EQ(ArrowSharedBufferInit(shared_buffer.get(), buffer.get()), NANOARROW_OK); + EXPECT_NE(shared_buffer->private_src.data, nullptr); + + nanoarrow::UniqueSharedBuffer shared_buffer2 = std::move(shared_buffer); + EXPECT_NE(shared_buffer2->private_src.data, nullptr); + EXPECT_EQ(shared_buffer->private_src.data, // NOLINT(clang-analyzer-cplusplus.Move) + nullptr); +} From 0670e40be9b8d9999bde771c1a7c011b5f8cb634 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sun, 5 Apr 2026 21:11:13 -0500 Subject: [PATCH 03/10] test lint --- src/nanoarrow/common/buffer_test.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/nanoarrow/common/buffer_test.cc b/src/nanoarrow/common/buffer_test.cc index 1096faaca..96ea3e53f 100644 --- a/src/nanoarrow/common/buffer_test.cc +++ b/src/nanoarrow/common/buffer_test.cc @@ -17,7 +17,6 @@ #include #include -#include #include #include From ca223daa99d07d766f4dc1a822076cf72d792178 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sun, 5 Apr 2026 21:32:49 -0500 Subject: [PATCH 04/10] just use ArrowBuffer as the user facing type --- src/nanoarrow/common/buffer_test.cc | 51 ++++++++++++++++++----------- src/nanoarrow/common/utils.c | 34 +++++++++---------- src/nanoarrow/hpp/unique.hpp | 19 ----------- src/nanoarrow/hpp/unique_test.cc | 16 --------- src/nanoarrow/ipc/decoder.c | 13 ++++---- src/nanoarrow/ipc/decoder_test.cc | 8 ++--- src/nanoarrow/ipc/ipc_hpp_test.cc | 16 --------- src/nanoarrow/ipc/reader.c | 10 +++--- src/nanoarrow/nanoarrow.h | 34 +++++++------------ src/nanoarrow/nanoarrow_ipc.h | 14 ++------ src/nanoarrow/nanoarrow_ipc.hpp | 4 --- 11 files changed, 77 insertions(+), 142 deletions(-) diff --git a/src/nanoarrow/common/buffer_test.cc b/src/nanoarrow/common/buffer_test.cc index 96ea3e53f..a9aa248f2 100644 --- a/src/nanoarrow/common/buffer_test.cc +++ b/src/nanoarrow/common/buffer_test.cc @@ -709,10 +709,10 @@ TEST(SharedBufferTest, SharedBufferInitResetEmpty) { struct ArrowBuffer src; ArrowBufferInit(&src); - struct ArrowSharedBuffer shared; + struct ArrowBuffer shared; ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); - EXPECT_EQ(shared.private_src.data, nullptr); - ArrowSharedBufferReset(&shared); + EXPECT_EQ(shared.data, nullptr); + ArrowBufferReset(&shared); } TEST(SharedBufferTest, SharedBufferInitReset) { @@ -720,15 +720,15 @@ TEST(SharedBufferTest, SharedBufferInitReset) { ArrowBufferInit(&src); ASSERT_EQ(ArrowBufferAppend(&src, "1234", 4), NANOARROW_OK); - struct ArrowSharedBuffer shared; + struct ArrowBuffer shared; ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); - EXPECT_NE(shared.private_src.data, nullptr); - EXPECT_EQ(shared.private_src.size_bytes, 4); - EXPECT_EQ(memcmp(shared.private_src.data, "1234", 4), 0); + EXPECT_NE(shared.data, nullptr); + EXPECT_EQ(shared.size_bytes, 4); + EXPECT_EQ(memcmp(shared.data, "1234", 4), 0); - ArrowSharedBufferReset(&shared); - EXPECT_EQ(shared.private_src.data, nullptr); - EXPECT_EQ(shared.private_src.size_bytes, 0); + ArrowBufferReset(&shared); + EXPECT_EQ(shared.data, nullptr); + EXPECT_EQ(shared.size_bytes, 0); } TEST(SharedBufferTest, SharedBufferClone) { @@ -736,18 +736,18 @@ TEST(SharedBufferTest, SharedBufferClone) { ArrowBufferInit(&src); ASSERT_EQ(ArrowBufferAppend(&src, "abcdef", 6), NANOARROW_OK); - struct ArrowSharedBuffer shared; + struct ArrowBuffer shared; ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); // Clone the shared buffer struct ArrowBuffer clone; - ArrowSharedBufferClone(&shared, &clone); - EXPECT_EQ(clone.data, shared.private_src.data); + ASSERT_EQ(ArrowSharedBufferClone(&shared, &clone), NANOARROW_OK); + EXPECT_EQ(clone.data, shared.data); EXPECT_EQ(clone.size_bytes, 6); EXPECT_EQ(memcmp(clone.data, "abcdef", 6), 0); // Release the original shared buffer; clone should still be valid - ArrowSharedBufferReset(&shared); + ArrowBufferReset(&shared); EXPECT_EQ(memcmp(clone.data, "abcdef", 6), 0); ArrowBufferReset(&clone); @@ -757,18 +757,29 @@ TEST(SharedBufferTest, SharedBufferCloneEmpty) { struct ArrowBuffer src; ArrowBufferInit(&src); - struct ArrowSharedBuffer shared; + struct ArrowBuffer shared; ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); struct ArrowBuffer clone; - ArrowSharedBufferClone(&shared, &clone); + ASSERT_EQ(ArrowSharedBufferClone(&shared, &clone), NANOARROW_OK); EXPECT_EQ(clone.data, nullptr); EXPECT_EQ(clone.size_bytes, 0); - ArrowSharedBufferReset(&shared); + ArrowBufferReset(&shared); ArrowBufferReset(&clone); } +TEST(SharedBufferTest, SharedBufferCloneNotShared) { + struct ArrowBuffer buf; + ArrowBufferInit(&buf); + ASSERT_EQ(ArrowBufferAppend(&buf, "abcdef", 6), NANOARROW_OK); + + struct ArrowBuffer clone; + EXPECT_EQ(ArrowSharedBufferClone(&buf, &clone), EINVAL); + + ArrowBufferReset(&buf); +} + TEST(SharedBufferTest, SharedBufferThreadSafeClone) { if (!ArrowSharedBufferIsThreadSafe()) { GTEST_SKIP() << "ArrowSharedBufferIsThreadSafe() returned false"; @@ -778,17 +789,17 @@ TEST(SharedBufferTest, SharedBufferThreadSafeClone) { ArrowBufferInit(&src); ASSERT_EQ(ArrowBufferAppend(&src, "abcdef", 6), NANOARROW_OK); - struct ArrowSharedBuffer shared; + struct ArrowBuffer shared; ASSERT_EQ(ArrowSharedBufferInit(&shared, &src), NANOARROW_OK); // Clone into multiple buffers struct ArrowBuffer clones[10]; for (int i = 0; i < 10; i++) { - ArrowSharedBufferClone(&shared, &clones[i]); + ASSERT_EQ(ArrowSharedBufferClone(&shared, &clones[i]), NANOARROW_OK); } // Release the original - ArrowSharedBufferReset(&shared); + ArrowBufferReset(&shared); // Release clones from separate threads std::thread threads[10]; diff --git a/src/nanoarrow/common/utils.c b/src/nanoarrow/common/utils.c index 7b6e3fa92..cfe152b8c 100644 --- a/src/nanoarrow/common/utils.c +++ b/src/nanoarrow/common/utils.c @@ -352,10 +352,10 @@ static void ArrowSharedBufferFree(struct ArrowBufferAllocator* allocator, uint8_ } } -ArrowErrorCode ArrowSharedBufferInit(struct ArrowSharedBuffer* shared, +ArrowErrorCode ArrowSharedBufferInit(struct ArrowBuffer* shared, struct ArrowBuffer* src) { if (src->data == NULL) { - ArrowBufferMove(src, &shared->private_src); + ArrowBufferMove(src, shared); return NANOARROW_OK; } @@ -369,32 +369,32 @@ ArrowErrorCode ArrowSharedBufferInit(struct ArrowSharedBuffer* shared, ArrowBufferMove(src, &private_data->src); ArrowSharedBufferSet(private_data, 1); - ArrowBufferInit(&shared->private_src); - shared->private_src.data = private_data->src.data; - shared->private_src.size_bytes = private_data->src.size_bytes; + ArrowBufferInit(shared); + shared->data = private_data->src.data; + shared->size_bytes = private_data->src.size_bytes; // Don't expose any extra capcity from src so that any calls to ArrowBufferAppend // on this buffer will fail. - shared->private_src.capacity_bytes = private_data->src.size_bytes; - shared->private_src.allocator = - ArrowBufferDeallocator(&ArrowSharedBufferFree, private_data); + shared->capacity_bytes = private_data->src.size_bytes; + shared->allocator = ArrowBufferDeallocator(&ArrowSharedBufferFree, private_data); return NANOARROW_OK; } -void ArrowSharedBufferClone(struct ArrowSharedBuffer* shared, - struct ArrowBuffer* shared_out) { - if (shared->private_src.size_bytes == 0) { +ArrowErrorCode ArrowSharedBufferClone(struct ArrowBuffer* shared, + struct ArrowBuffer* shared_out) { + if (shared->size_bytes == 0) { ArrowBufferInit(shared_out); - return; + return NANOARROW_OK; + } + + if (shared->allocator.free != &ArrowSharedBufferFree) { + return EINVAL; } struct ArrowSharedBufferPrivate* private_data = - (struct ArrowSharedBufferPrivate*)shared->private_src.allocator.private_data; + (struct ArrowSharedBufferPrivate*)shared->allocator.private_data; ArrowSharedBufferUpdate(private_data, 1); memcpy(shared_out, shared, sizeof(struct ArrowBuffer)); -} - -void ArrowSharedBufferReset(struct ArrowSharedBuffer* shared) { - ArrowBufferReset(&shared->private_src); + return NANOARROW_OK; } static const int kInt32DecimalDigits = 9; diff --git a/src/nanoarrow/hpp/unique.hpp b/src/nanoarrow/hpp/unique.hpp index 7fb569f7d..a67f4fcdd 100644 --- a/src/nanoarrow/hpp/unique.hpp +++ b/src/nanoarrow/hpp/unique.hpp @@ -135,22 +135,6 @@ inline void release_pointer(struct ArrowArrayView* data) { ArrowArrayViewReset(data); } -template <> -inline void init_pointer(struct ArrowSharedBuffer* data) { - init_pointer(&data->private_src); -} - -template <> -inline void move_pointer(struct ArrowSharedBuffer* src, - struct ArrowSharedBuffer* dst) { - move_pointer(&src->private_src, &dst->private_src); -} - -template <> -inline void release_pointer(struct ArrowSharedBuffer* data) { - ArrowSharedBufferReset(data); -} - /// \brief A unique_ptr-like base class for stack-allocatable objects /// \tparam T The object type template @@ -235,9 +219,6 @@ using UniqueBitmap = internal::Unique; /// \brief Class wrapping a unique struct ArrowArrayView using UniqueArrayView = internal::Unique; -/// \brief Class wrapping a unique struct ArrowSharedBuffer -using UniqueSharedBuffer = internal::Unique; - /// @} NANOARROW_CXX_NAMESPACE_END diff --git a/src/nanoarrow/hpp/unique_test.cc b/src/nanoarrow/hpp/unique_test.cc index 9b1b77310..d9d25cacf 100644 --- a/src/nanoarrow/hpp/unique_test.cc +++ b/src/nanoarrow/hpp/unique_test.cc @@ -180,19 +180,3 @@ TEST(HppUnique, UniqueArrayView) { EXPECT_EQ(array_view2->storage_type, NANOARROW_TYPE_UNINITIALIZED); EXPECT_EQ(array_view3->storage_type, NANOARROW_TYPE_STRUCT); } - -TEST(HppUnique, UniqueSharedBuffer) { - nanoarrow::UniqueSharedBuffer shared_buffer; - - nanoarrow::UniqueBuffer buffer; - ASSERT_EQ(ArrowBufferAppend(buffer.get(), "1234", 4), NANOARROW_OK); - - EXPECT_EQ(shared_buffer->private_src.data, nullptr); - ASSERT_EQ(ArrowSharedBufferInit(shared_buffer.get(), buffer.get()), NANOARROW_OK); - EXPECT_NE(shared_buffer->private_src.data, nullptr); - - nanoarrow::UniqueSharedBuffer shared_buffer2 = std::move(shared_buffer); - EXPECT_NE(shared_buffer2->private_src.data, nullptr); - EXPECT_EQ(shared_buffer->private_src.data, // NOLINT(clang-analyzer-cplusplus.Move) - nullptr); -} diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c index d9d5ed9f5..f266c740b 100644 --- a/src/nanoarrow/ipc/decoder.c +++ b/src/nanoarrow/ipc/decoder.c @@ -1930,7 +1930,7 @@ struct ArrowIpcBufferSource { /// has been read into memory. This abstraction is currently internal and exists /// to support the two obvious ways a user might go about this: (1) using a /// non-owned view of memory that must be copied slice-wise or (2) adding a reference -/// to an ArrowIpcSharedBuffer and returning a slice of that memory. +/// to an ArrowBuffer (shared buffer) and returning a slice of that memory. struct ArrowIpcBufferFactory { /// \brief User-defined callback to populate a buffer view /// @@ -2056,14 +2056,13 @@ static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory* struct ArrowBufferView* dst_view, struct ArrowBuffer* dst, struct ArrowError* error) { - struct ArrowIpcSharedBuffer* shared = - (struct ArrowIpcSharedBuffer*)factory->private_data; + struct ArrowBuffer* shared = (struct ArrowBuffer*)factory->private_data; int needs_decompression = 0; int uncompressed_data_offset = 0; if (src->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) { struct ArrowBufferView src_view; - src_view.data.as_uint8 = shared->private_src.data + src->body_offset_bytes; + src_view.data.as_uint8 = shared->data + src->body_offset_bytes; src_view.size_bytes = src->buffer_length_bytes; NANOARROW_RETURN_NOT_OK(ArrowIpcDecompressBufferFromView( factory->decompressor, src->codec, src_view, dst, &needs_decompression, error)); @@ -2072,7 +2071,7 @@ static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory* if (!needs_decompression) { ArrowBufferReset(dst); - ArrowSharedBufferClone(shared, dst); + NANOARROW_RETURN_NOT_OK(ArrowSharedBufferClone(shared, dst)); dst->data += src->body_offset_bytes + uncompressed_data_offset; dst->size_bytes = src->buffer_length_bytes - uncompressed_data_offset; } @@ -2083,7 +2082,7 @@ static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory* } static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromShared( - struct ArrowIpcSharedBuffer* shared) { + struct ArrowBuffer* shared) { struct ArrowIpcBufferFactory out; out.make_buffer = &ArrowIpcMakeBufferFromShared; out.decompressor = NULL; @@ -2594,7 +2593,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( } ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* body, int64_t i, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* body, int64_t i, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error) { return ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries(decoder, body, i, NULL, out, diff --git a/src/nanoarrow/ipc/decoder_test.cc b/src/nanoarrow/ipc/decoder_test.cc index 48ed0af1f..694310a9c 100644 --- a/src/nanoarrow/ipc/decoder_test.cc +++ b/src/nanoarrow/ipc/decoder_test.cc @@ -970,7 +970,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) { decoder.body_size_bytes), NANOARROW_OK); - struct ArrowIpcSharedBuffer shared; + struct ArrowBuffer shared; ASSERT_EQ(ArrowIpcSharedBufferInit(&shared, &body), NANOARROW_OK); // Check full struct extract @@ -996,7 +996,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) { NANOARROW_OK); // Release the original shared (forthcoming array buffers should still be valid) - ArrowIpcSharedBufferReset(&shared); + ArrowBufferReset(&shared); ASSERT_EQ(array.n_buffers, 2); ASSERT_EQ(array.length, 3); @@ -1039,7 +1039,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcSharedBufferThreadSafeDecode) { decoder.body_size_bytes), NANOARROW_OK); - struct ArrowIpcSharedBuffer shared; + struct ArrowBuffer shared; ASSERT_EQ(ArrowIpcSharedBufferInit(&shared, &body), NANOARROW_OK); struct ArrowArray arrays[10]; @@ -1051,7 +1051,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcSharedBufferThreadSafeDecode) { } // Clean up - ArrowIpcSharedBufferReset(&shared); + ArrowBufferReset(&shared); ArrowIpcDecoderReset(&decoder); ArrowSchemaRelease(&schema); diff --git a/src/nanoarrow/ipc/ipc_hpp_test.cc b/src/nanoarrow/ipc/ipc_hpp_test.cc index 0ff57dedf..393a0bd34 100644 --- a/src/nanoarrow/ipc/ipc_hpp_test.cc +++ b/src/nanoarrow/ipc/ipc_hpp_test.cc @@ -19,22 +19,6 @@ #include "nanoarrow/nanoarrow_ipc.hpp" -TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueSharedBuffer) { - nanoarrow::ipc::UniqueSharedBuffer shared_buffer; - - nanoarrow::UniqueBuffer buffer; - ASSERT_EQ(ArrowBufferAppend(buffer.get(), "1234", 4), NANOARROW_OK); - - EXPECT_EQ(shared_buffer->private_src.data, nullptr); - ASSERT_EQ(ArrowIpcSharedBufferInit(shared_buffer.get(), buffer.get()), NANOARROW_OK); - EXPECT_NE(shared_buffer->private_src.data, nullptr); - - nanoarrow::ipc::UniqueSharedBuffer shared_buffer2 = std::move(shared_buffer); - EXPECT_NE(shared_buffer2->private_src.data, nullptr); - EXPECT_EQ(shared_buffer->private_src.data, // NOLINT(clang-analyzer-cplusplus.Move) - nullptr); -} - TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueDecoder) { nanoarrow::ipc::UniqueDecoder decoder; diff --git a/src/nanoarrow/ipc/reader.c b/src/nanoarrow/ipc/reader.c index 897508241..4cb04d799 100644 --- a/src/nanoarrow/ipc/reader.c +++ b/src/nanoarrow/ipc/reader.c @@ -459,14 +459,14 @@ static int ArrowIpcArrayStreamReaderProcessRecordBatch( NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data)); if (private_data->use_shared_buffers) { - struct ArrowIpcSharedBuffer shared; + struct ArrowBuffer shared; NANOARROW_RETURN_NOT_OK_WITH_ERROR( - ArrowIpcSharedBufferInit(&shared, &private_data->body), &private_data->error); - int result = ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( + ArrowSharedBufferInit(&shared, &private_data->body), &private_data->error); + ArrowErrorCode result = ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( &private_data->decoder, &shared, private_data->field_index, &private_data->dictionaries, out, NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error); - ArrowIpcSharedBufferReset(&shared); + ArrowBufferReset(&shared); NANOARROW_RETURN_NOT_OK(result); } else { struct ArrowBufferView body_view; @@ -489,7 +489,7 @@ static int ArrowIpcArrayStreamReaderProcessDictionary( if (private_data->use_shared_buffers) { // Decode the dictionary - struct ArrowIpcSharedBuffer shared; + struct ArrowBuffer shared; NANOARROW_RETURN_NOT_OK_WITH_ERROR( ArrowIpcSharedBufferInit(&shared, &private_data->body), &private_data->error); int result = ArrowIpcDecoderDecodeDictionaryFromShared( diff --git a/src/nanoarrow/nanoarrow.h b/src/nanoarrow/nanoarrow.h index 255573cc1..97befc9b2 100644 --- a/src/nanoarrow/nanoarrow.h +++ b/src/nanoarrow/nanoarrow.h @@ -45,10 +45,7 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBufferAllocatorDefault) #define ArrowBufferDeallocator \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBufferDeallocator) -#define ArrowSharedBufferInit \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferInit) -#define ArrowSharedBufferReset \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferReset) +#define ArrowSharedBufferInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferInit) #define ArrowSharedBufferIsThreadSafe \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferIsThreadSafe) #define ArrowSharedBufferClone \ @@ -219,25 +216,15 @@ NANOARROW_DLL struct ArrowBufferAllocator ArrowBufferDeallocator( /// /// @{ -/// \brief A structure representing a reference-counted buffer -struct ArrowSharedBuffer { - struct ArrowBuffer private_src; -}; - -/// \brief Initialize the contents of an ArrowSharedBuffer struct +/// \brief Initialize a shared buffer from an ArrowBuffer /// -/// If NANOARROW_OK is returned, the ArrowSharedBuffer takes ownership of -/// src. -NANOARROW_DLL ArrowErrorCode ArrowSharedBufferInit(struct ArrowSharedBuffer* shared, +/// If NANOARROW_OK is returned, shared is a reference-counted buffer that +/// took ownership of src. The caller should release the shared buffer +/// using ArrowBufferReset() when finished. The underlying data will persist +/// until all clones have also been released. +NANOARROW_DLL ArrowErrorCode ArrowSharedBufferInit(struct ArrowBuffer* shared, struct ArrowBuffer* src); -/// \brief Release the caller's copy of the shared buffer -/// -/// When finished, the caller must relinquish its own copy of the shared data -/// using this function. The original buffer will continue to exist until all -/// ArrowArray objects that refer to it have also been released. -NANOARROW_DLL void ArrowSharedBufferReset(struct ArrowSharedBuffer* shared); - /// \brief Check for shared buffer thread safety /// /// Thread-safe shared buffers require C11 and the stdatomic.h header. @@ -248,9 +235,10 @@ NANOARROW_DLL int ArrowSharedBufferIsThreadSafe(void); /// \brief Clone a shared buffer, incrementing its reference count /// /// This creates a new ArrowBuffer that shares the same underlying data with the -/// original shared buffer. The reference count is incremented. -NANOARROW_DLL void ArrowSharedBufferClone(struct ArrowSharedBuffer* shared, - struct ArrowBuffer* shared_out); +/// original shared buffer. The reference count is incremented. Returns EINVAL +/// if shared is not a shared buffer created by ArrowSharedBufferInit(). +NANOARROW_DLL ArrowErrorCode ArrowSharedBufferClone(struct ArrowBuffer* shared, + struct ArrowBuffer* shared_out); /// @} diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h index 4b3036150..8031bd323 100644 --- a/src/nanoarrow/nanoarrow_ipc.h +++ b/src/nanoarrow/nanoarrow_ipc.h @@ -25,7 +25,6 @@ #define ArrowIpcCheckRuntime NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcCheckRuntime) #define ArrowIpcSharedBufferIsThreadSafe ArrowSharedBufferIsThreadSafe #define ArrowIpcSharedBufferInit ArrowSharedBufferInit -#define ArrowIpcSharedBufferReset ArrowSharedBufferReset #define ArrowIpcGetZstdDecompressionFunction \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcGetZstdDecompressionFunction) #define ArrowIpcGetLZ4DecompressionFunction \ @@ -143,7 +142,6 @@ // Backward-compatible aliases (non-namespaced builds) #define ArrowIpcSharedBufferIsThreadSafe ArrowSharedBufferIsThreadSafe #define ArrowIpcSharedBufferInit ArrowSharedBufferInit -#define ArrowIpcSharedBufferReset ArrowSharedBufferReset #endif @@ -337,12 +335,6 @@ static inline enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) { } } -/// \brief A structure representing a reference-counted buffer that may be passed to -/// ArrowIpcDecoderDecodeArrayFromShared(). -/// -/// \deprecated Use ArrowSharedBuffer instead. -#define ArrowIpcSharedBuffer ArrowSharedBuffer - /// \brief A user-extensible decompressor /// /// The ArrowIpcDecompressor is the underlying object that enables decompression in the @@ -647,9 +639,9 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArray( /// \brief Decode an ArrowArray from an owned buffer with dictionary decoding support /// /// This implementation takes advantage of the fact that it can avoid copying individual -/// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body after one or +/// buffers. In all cases the caller must ArrowBufferReset() body after one or /// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If -/// ArrowIpcSharedBufferIsThreadSafe() returns 0, out must not be released by another +/// ArrowSharedBufferIsThreadSafe() returns 0, out must not be released by another /// thread. NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, int64_t i, @@ -661,7 +653,7 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionarie /// Equivalent to calling ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries() with /// dictionaries as NULL. NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, int64_t i, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* shared, int64_t i, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error); diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp index 0b231cd4f..e6b23924f 100644 --- a/src/nanoarrow/nanoarrow_ipc.hpp +++ b/src/nanoarrow/nanoarrow_ipc.hpp @@ -197,10 +197,6 @@ namespace ipc { /// /// @{ -/// \brief Class wrapping a unique struct ArrowSharedBuffer -/// \deprecated Use nanoarrow::UniqueSharedBuffer instead. -using UniqueSharedBuffer = internal::Unique; - /// \brief Class wrapping a unique struct ArrowIpcDecoder using UniqueDecoder = internal::Unique; From 6f25dd97fc5be5bc964e1f46b8a0012b752abf3d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sun, 5 Apr 2026 23:20:39 -0500 Subject: [PATCH 05/10] shared buffers from arrays --- src/nanoarrow/common/array.c | 2 +- src/nanoarrow/common/buffer_test.cc | 150 ++++++++++++++++++++++++++++ src/nanoarrow/common/utils.c | 129 ++++++++++++++++++++++-- src/nanoarrow/nanoarrow.h | 62 +++++++++++- 4 files changed, 331 insertions(+), 12 deletions(-) diff --git a/src/nanoarrow/common/array.c b/src/nanoarrow/common/array.c index ec7bdb667..dfc3f46da 100644 --- a/src/nanoarrow/common/array.c +++ b/src/nanoarrow/common/array.c @@ -73,7 +73,7 @@ static void ArrowArrayReleaseInternal(struct ArrowArray* array) { array->release = NULL; } -static int ArrowArrayIsInternal(struct ArrowArray* array) { +int ArrowArrayIsInternal(struct ArrowArray* array) { return array->release == &ArrowArrayReleaseInternal; } diff --git a/src/nanoarrow/common/buffer_test.cc b/src/nanoarrow/common/buffer_test.cc index a9aa248f2..3b3cc8af6 100644 --- a/src/nanoarrow/common/buffer_test.cc +++ b/src/nanoarrow/common/buffer_test.cc @@ -814,3 +814,153 @@ TEST(SharedBufferTest, SharedBufferThreadSafeClone) { threads[i].join(); } } + +// Helper to build a simple int32 array with known data for shared array tests +static void MakeInt32Array(struct ArrowArray* array, const int32_t* values, int64_t n) { + ASSERT_EQ(ArrowArrayInitFromType(array, NANOARROW_TYPE_INT32), NANOARROW_OK); + ASSERT_EQ(ArrowArrayStartAppending(array), NANOARROW_OK); + for (int64_t i = 0; i < n; i++) { + ASSERT_EQ(ArrowArrayAppendInt(array, values[i]), NANOARROW_OK); + } + ASSERT_EQ(ArrowArrayFinishBuildingDefault(array, nullptr), NANOARROW_OK); +} + +TEST(SharedArrayTest, SharedArrayInitRelease) { + int32_t values[] = {1, 2, 3, 4}; + struct ArrowArray src; + MakeInt32Array(&src, values, 4); + ASSERT_NE(src.release, nullptr); + + struct ArrowSharedArray shared; + ASSERT_EQ(ArrowSharedArrayInit(&shared, &src), NANOARROW_OK); + // src should have been moved + EXPECT_EQ(src.release, nullptr); + + ArrowSharedArrayRelease(&shared); + EXPECT_EQ(shared.private_data, nullptr); +} + +TEST(SharedArrayTest, SharedArrayReleaseNull) { + struct ArrowSharedArray shared; + shared.private_data = nullptr; + // Should not crash + ArrowSharedArrayRelease(&shared); +} + +TEST(SharedArrayTest, SharedArrayBuffer) { + int32_t values[] = {10, 20, 30}; + struct ArrowArray src; + MakeInt32Array(&src, values, 3); + + struct ArrowSharedArray shared; + ASSERT_EQ(ArrowSharedArrayInit(&shared, &src), NANOARROW_OK); + + // Buffer 1 is the data buffer for int32 + struct ArrowBuffer buf; + ASSERT_EQ(ArrowSharedArrayBuffer(&shared, 1, &buf), NANOARROW_OK); + EXPECT_NE(buf.data, nullptr); + // Internal array: size should be known + EXPECT_EQ(buf.size_bytes, 3 * sizeof(int32_t)); + const int32_t* data = reinterpret_cast(buf.data); + EXPECT_EQ(data[0], 10); + EXPECT_EQ(data[1], 20); + EXPECT_EQ(data[2], 30); + + // Release the shared array; buffer should still be valid + ArrowSharedArrayRelease(&shared); + EXPECT_EQ(reinterpret_cast(buf.data)[0], 10); + + ArrowBufferReset(&buf); +} + +TEST(SharedArrayTest, SharedArrayBufferClone) { + int32_t values[] = {100, 200}; + struct ArrowArray src; + MakeInt32Array(&src, values, 2); + + struct ArrowSharedArray shared; + ASSERT_EQ(ArrowSharedArrayInit(&shared, &src), NANOARROW_OK); + + struct ArrowBuffer buf; + ASSERT_EQ(ArrowSharedArrayBuffer(&shared, 1, &buf), NANOARROW_OK); + + // Clone the buffer obtained from the shared array + struct ArrowBuffer clone; + ASSERT_EQ(ArrowSharedBufferClone(&buf, &clone), NANOARROW_OK); + EXPECT_EQ(clone.data, buf.data); + EXPECT_EQ(clone.size_bytes, buf.size_bytes); + + // Release original shared array and buffer; clone should still be valid + ArrowSharedArrayRelease(&shared); + ArrowBufferReset(&buf); + const int32_t* data = reinterpret_cast(clone.data); + EXPECT_EQ(data[0], 100); + EXPECT_EQ(data[1], 200); + + ArrowBufferReset(&clone); +} + +TEST(SharedArrayTest, SharedArrayMultipleBuffers) { + int32_t values[] = {1, 2, 3, 4}; + struct ArrowArray src; + MakeInt32Array(&src, values, 4); + + struct ArrowSharedArray shared; + ASSERT_EQ(ArrowSharedArrayInit(&shared, &src), NANOARROW_OK); + + // Extract both buffers + struct ArrowBuffer buf0, buf1; + ASSERT_EQ(ArrowSharedArrayBuffer(&shared, 0, &buf0), NANOARROW_OK); + ASSERT_EQ(ArrowSharedArrayBuffer(&shared, 1, &buf1), NANOARROW_OK); + + // Release shared array; buffers should keep data alive + ArrowSharedArrayRelease(&shared); + + EXPECT_EQ(buf1.size_bytes, 4 * sizeof(int32_t)); + const int32_t* data = reinterpret_cast(buf1.data); + EXPECT_EQ(data[0], 1); + EXPECT_EQ(data[3], 4); + + // Release one buffer; remaining buffer should still be valid + ArrowBufferReset(&buf0); + EXPECT_EQ(reinterpret_cast(buf1.data)[2], 3); + + ArrowBufferReset(&buf1); +} + +TEST(SharedArrayTest, SharedArrayThreadSafeBuffer) { + if (!ArrowSharedBufferIsThreadSafe()) { + GTEST_SKIP() << "ArrowSharedBufferIsThreadSafe() returned false"; + } + + int32_t values[] = {42, 43, 44, 45}; + struct ArrowArray src; + MakeInt32Array(&src, values, 4); + + struct ArrowSharedArray shared; + ASSERT_EQ(ArrowSharedArrayInit(&shared, &src), NANOARROW_OK); + + // Extract multiple buffer clones + struct ArrowBuffer buffers[10]; + for (int i = 0; i < 10; i++) { + ASSERT_EQ(ArrowSharedArrayBuffer(&shared, 1, &buffers[i]), NANOARROW_OK); + } + + // Release the shared array + ArrowSharedArrayRelease(&shared); + + // Release buffers from separate threads + std::thread threads[10]; + for (int i = 0; i < 10; i++) { + threads[i] = std::thread([&buffers, i] { + const int32_t* data = reinterpret_cast(buffers[i].data); + EXPECT_EQ(data[0], 42); + EXPECT_EQ(data[3], 45); + ArrowBufferReset(&buffers[i]); + }); + } + + for (int i = 0; i < 10; i++) { + threads[i].join(); + } +} diff --git a/src/nanoarrow/common/utils.c b/src/nanoarrow/common/utils.c index cfe152b8c..c6943a0ac 100644 --- a/src/nanoarrow/common/utils.c +++ b/src/nanoarrow/common/utils.c @@ -317,6 +317,22 @@ static void ArrowSharedBufferSet(struct ArrowSharedBufferPrivate* private_data, } int ArrowSharedBufferIsThreadSafe(void) { return 1; } + +struct ArrowSharedArrayPrivate { + struct ArrowArray src; + atomic_long reference_count; +}; + +static int64_t ArrowSharedArrayUpdate(struct ArrowSharedArrayPrivate* private_data, + int delta) { + int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta); + return old_count + delta; +} + +static void ArrowSharedArraySet(struct ArrowSharedArrayPrivate* private_data, + int64_t count) { + atomic_store(&private_data->reference_count, count); +} #else struct ArrowSharedBufferPrivate { struct ArrowBuffer src; @@ -335,6 +351,22 @@ static void ArrowSharedBufferSet(struct ArrowSharedBufferPrivate* private_data, } int ArrowSharedBufferIsThreadSafe(void) { return 0; } + +struct ArrowSharedArrayPrivate { + struct ArrowArray src; + int64_t reference_count; +}; + +static int64_t ArrowSharedArrayUpdate(struct ArrowSharedArrayPrivate* private_data, + int delta) { + private_data->reference_count += delta; + return private_data->reference_count; +} + +static void ArrowSharedArraySet(struct ArrowSharedArrayPrivate* private_data, + int64_t count) { + private_data->reference_count = count; +} #endif static void ArrowSharedBufferFree(struct ArrowBufferAllocator* allocator, uint8_t* ptr, @@ -352,6 +384,22 @@ static void ArrowSharedBufferFree(struct ArrowBufferAllocator* allocator, uint8_ } } +static void ArrowSharedArrayBufferFree(struct ArrowBufferAllocator* allocator, + uint8_t* ptr, int64_t size) { + NANOARROW_UNUSED(ptr); + NANOARROW_UNUSED(size); + + struct ArrowSharedArrayPrivate* private_data = + (struct ArrowSharedArrayPrivate*)allocator->private_data; + + if (ArrowSharedArrayUpdate(private_data, -1) == 0) { + if (private_data->src.release != NULL) { + ArrowArrayRelease(&private_data->src); + } + ArrowFree(private_data); + } +} + ArrowErrorCode ArrowSharedBufferInit(struct ArrowBuffer* shared, struct ArrowBuffer* src) { if (src->data == NULL) { @@ -386,17 +434,86 @@ ArrowErrorCode ArrowSharedBufferClone(struct ArrowBuffer* shared, return NANOARROW_OK; } - if (shared->allocator.free != &ArrowSharedBufferFree) { - return EINVAL; + if (shared->allocator.free == &ArrowSharedBufferFree) { + struct ArrowSharedBufferPrivate* private_data = + (struct ArrowSharedBufferPrivate*)shared->allocator.private_data; + ArrowSharedBufferUpdate(private_data, 1); + memcpy(shared_out, shared, sizeof(struct ArrowBuffer)); + return NANOARROW_OK; } - struct ArrowSharedBufferPrivate* private_data = - (struct ArrowSharedBufferPrivate*)shared->allocator.private_data; - ArrowSharedBufferUpdate(private_data, 1); - memcpy(shared_out, shared, sizeof(struct ArrowBuffer)); + if (shared->allocator.free == &ArrowSharedArrayBufferFree) { + struct ArrowSharedArrayPrivate* private_data = + (struct ArrowSharedArrayPrivate*)shared->allocator.private_data; + ArrowSharedArrayUpdate(private_data, 1); + memcpy(shared_out, shared, sizeof(struct ArrowBuffer)); + return NANOARROW_OK; + } + + return EINVAL; +} + +ArrowErrorCode ArrowSharedArrayInit(struct ArrowSharedArray* shared, + struct ArrowArray* src) { + struct ArrowSharedArrayPrivate* private_data = + (struct ArrowSharedArrayPrivate*)ArrowMalloc( + sizeof(struct ArrowSharedArrayPrivate)); + if (private_data == NULL) { + return ENOMEM; + } + + ArrowArrayMove(src, &private_data->src); + ArrowSharedArraySet(private_data, 1); + shared->private_data = private_data; return NANOARROW_OK; } +void ArrowSharedArrayRelease(struct ArrowSharedArray* shared) { + if (shared->private_data == NULL) { + return; + } + + struct ArrowSharedArrayPrivate* private_data = + (struct ArrowSharedArrayPrivate*)shared->private_data; + + if (ArrowSharedArrayUpdate(private_data, -1) == 0) { + if (private_data->src.release != NULL) { + ArrowArrayRelease(&private_data->src); + } + ArrowFree(private_data); + } + + shared->private_data = NULL; +} + +ArrowErrorCode ArrowSharedArrayBuffer(struct ArrowSharedArray* shared, + int64_t i, struct ArrowBuffer* out) { + struct ArrowSharedArrayPrivate* private_data = + (struct ArrowSharedArrayPrivate*)shared->private_data; + NANOARROW_DCHECK(i >= 0 && i < private_data->src.n_buffers); + + ArrowSharedArrayUpdate(private_data, 1); + ArrowBufferInit(out); + + if (ArrowArrayIsInternal(&private_data->src)) { + // The source array was built with nanoarrow, so we can get buffer size info + struct ArrowBuffer* src = ArrowArrayBuffer(&private_data->src, i); + out->data = src->data; + out->size_bytes = src->size_bytes; + out->capacity_bytes = src->size_bytes; + } else { + // Generic C Data Interface array: buffer sizes are not known + out->data = (uint8_t*)private_data->src.buffers[i]; + out->size_bytes = 0; + out->capacity_bytes = 0; + } + + out->allocator = ArrowBufferDeallocator(&ArrowSharedArrayBufferFree, private_data); + return NANOARROW_OK; +} + + + static const int kInt32DecimalDigits = 9; static const uint64_t kUInt32PowersOfTen[] = { diff --git a/src/nanoarrow/nanoarrow.h b/src/nanoarrow/nanoarrow.h index 97befc9b2..43d948bbd 100644 --- a/src/nanoarrow/nanoarrow.h +++ b/src/nanoarrow/nanoarrow.h @@ -50,6 +50,11 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferIsThreadSafe) #define ArrowSharedBufferClone \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedBufferClone) +#define ArrowSharedArrayInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedArrayInit) +#define ArrowSharedArrayRelease \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedArrayRelease) +#define ArrowSharedArrayBuffer \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedArrayBuffer) #define ArrowErrorSet NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowErrorSet) #define ArrowLayoutInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowLayoutInit) #define ArrowDecimalSetDigits NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDecimalSetDigits) @@ -99,6 +104,7 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowMetadataBuilderRemove) #define ArrowSchemaViewInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSchemaViewInit) #define ArrowSchemaToString NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSchemaToString) +#define ArrowArrayIsInternal NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayIsInternal) #define ArrowArrayInitFromType \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayInitFromType) #define ArrowArrayInitFromSchema \ @@ -208,11 +214,18 @@ NANOARROW_DLL struct ArrowBufferAllocator ArrowBufferDeallocator( /// @} -/// \defgroup nanoarrow-shared-buffer Shared buffers +/// \defgroup nanoarrow-shared-buffer Shared buffers and arrays /// -/// A shared buffer is a reference-counted ArrowBuffer that may be shared -/// across multiple ArrowArray objects. This avoids copying when decoding -/// arrays from a single source buffer. +/// The nanoarrow library implements two kinds of shared buffers: those backed by +/// a reference-counted ArrowBuffer and those backed by a reference-counted ArrowArray. +/// These are useful for building ArrowArrays using buffers without copying the bytes +/// of the source. For example the IPC implementation uses shared buffers derived from an +/// ArrowBuffer to avoid copying buffers from an IPC message, and the Shared buffers +/// derived from arrays are used to make a safe shallow clone of an array. +/// +/// Reference counts are implemented using C11 atomics and not necessarily thread safe +/// on all platforms (notably, MSVC requires `/experimental:c11atomics`). Use +/// ArrowSharedBufferIsThreadSafe() to check for thread safety if this is needed. /// /// @{ @@ -236,10 +249,43 @@ NANOARROW_DLL int ArrowSharedBufferIsThreadSafe(void); /// /// This creates a new ArrowBuffer that shares the same underlying data with the /// original shared buffer. The reference count is incremented. Returns EINVAL -/// if shared is not a shared buffer created by ArrowSharedBufferInit(). +/// if shared is not a shared buffer created by ArrowSharedBufferInit() or +/// obtained from ArrowSharedArrayBuffer(). NANOARROW_DLL ArrowErrorCode ArrowSharedBufferClone(struct ArrowBuffer* shared, struct ArrowBuffer* shared_out); +/// \brief An opaque handle to a shared, reference-counted ArrowArray +struct ArrowSharedArray { + void* private_data; +}; + +/// \brief Initialize a shared array from an ArrowArray +/// +/// Takes ownership of src (sets src->release to NULL). The shared array +/// should be released with ArrowSharedArrayRelease() when all buffers have +/// been extracted from the source. The original array will be released +/// when all borrowed buffers have been released. +NANOARROW_DLL ArrowErrorCode ArrowSharedArrayInit(struct ArrowSharedArray* shared, + struct ArrowArray* src); + +/// \brief Release the caller's reference to a shared array +/// +/// Decrements the reference count. When no more references remain +/// (including any buffers obtained via ArrowSharedArrayBuffer()), +/// the underlying ArrowArray is released. +NANOARROW_DLL void ArrowSharedArrayRelease(struct ArrowSharedArray* shared); + +/// \brief Obtain a buffer from a shared array +/// +/// Returns an ArrowBuffer that views buffer i of the underlying ArrowArray. +/// If the source array was built with nanoarrow (i.e. ArrowArrayIsInternal() +/// returns true), buffer sizes are known and propagated to the output. +/// Otherwise the output buffer's size_bytes will be 0. The shared array's +/// reference count is incremented. The returned buffer is compatible with +/// ArrowSharedBufferClone(). +NANOARROW_DLL ArrowErrorCode ArrowSharedArrayBuffer(struct ArrowSharedArray* shared, + int64_t i, struct ArrowBuffer* out); + /// @} /// \brief Move the contents of an src ArrowSchema into dst and set src->release to NULL @@ -1077,6 +1123,12 @@ static inline ArrowErrorCode ArrowArrayShrinkToFit(struct ArrowArray* array); NANOARROW_DLL ArrowErrorCode ArrowArrayFinishBuildingDefault(struct ArrowArray* array, struct ArrowError* error); +/// \brief Check if an ArrowArray was allocated by nanoarrow +/// +/// Returns non-zero if array was allocated using ArrowArrayInitFromType(), +/// ArrowArrayInitFromSchema(), or ArrowArrayInitFromArrayView(). +NANOARROW_DLL int ArrowArrayIsInternal(struct ArrowArray* array); + /// \brief Finish building an ArrowArray with explicit validation /// /// Finish building with an explicit validation level. This could perform less validation From f66f204a77cc82d67054d40b84792b819495a110 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 6 Apr 2026 00:11:32 -0500 Subject: [PATCH 06/10] clone shared --- src/nanoarrow/common/array.c | 137 +++++++++++++++++++++++++++++++++++ src/nanoarrow/common/utils.c | 11 ++- src/nanoarrow/nanoarrow.h | 31 +++++++- 3 files changed, 173 insertions(+), 6 deletions(-) diff --git a/src/nanoarrow/common/array.c b/src/nanoarrow/common/array.c index dfc3f46da..2b7e820e0 100644 --- a/src/nanoarrow/common/array.c +++ b/src/nanoarrow/common/array.c @@ -568,6 +568,143 @@ ArrowErrorCode ArrowArrayFinishBuildingDefault(struct ArrowArray* array, return ArrowArrayFinishBuilding(array, NANOARROW_VALIDATION_LEVEL_DEFAULT, error); } +static int ArrowArrayIsShared(struct ArrowArray* array) { + if (!ArrowArrayIsInternal(array)) { + return 0; + } + + for (int64_t i = 0; i < array->n_buffers; i++) { + struct ArrowBuffer* buffer = ArrowArrayBuffer(array, i); + if (buffer->data != NULL && !ArrowIsSharedBuffer(buffer)) { + return 0; + } + } + + for (int64_t i = 0; i < array->n_children; i++) { + if (!ArrowArrayIsShared(array->children[i])) { + return 0; + } + } + + if (array->dictionary != NULL && !ArrowArrayIsShared(array->dictionary)) { + return 0; + } + + return 1; +} + +static ArrowErrorCode ArrowArrayMoveSharedInternal(struct ArrowArray* src, + struct ArrowArray* dst) { + if (ArrowArrayIsShared(src)) { + ArrowArrayMove(src, dst); + return NANOARROW_OK; + } + + NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(dst, NANOARROW_TYPE_UNINITIALIZED)); + + // Allocate children and move source children to dst children + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(dst, src->n_children)); + for (int64_t i = 0; i < src->n_children; i++) { + NANOARROW_RETURN_NOT_OK( + ArrowArrayMoveSharedInternal(src->children[i], dst->children[i])); + } + + // Allocate dictionary if needed and move source dictionary to dst dictionary + if (src->dictionary != NULL) { + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateDictionary(dst)); + NANOARROW_RETURN_NOT_OK(ArrowArrayMoveSharedInternal(src, dst)); + } + + // We might need some more buffers if we are shallowly copying a string/binary view + if (src->n_buffers > 3) { + if (src->n_buffers > INT_MAX) { + return EINVAL; + } + + NANOARROW_RETURN_NOT_OK( + ArrowArrayAddVariadicBuffers(dst, (int32_t)src->n_buffers - 3)); + } + + // Move src into a shared array and set dst's buffers using the ref-counted version + struct ArrowSharedArray shared_array; + NANOARROW_RETURN_NOT_OK(ArrowSharedArrayInit(&shared_array, src)); + + for (int64_t i = 0; i < src->n_buffers; i++) { + struct ArrowBuffer* dst_buffer = ArrowArrayBuffer(dst, i); + ArrowErrorCode result = ArrowSharedArrayBuffer(&shared_array, i, dst_buffer); + if (result != NANOARROW_OK) { + ArrowSharedArrayRelease(&shared_array); + return result; + } + } + + ArrowSharedArrayRelease(&shared_array); + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowArrayCloneSharedInternal(struct ArrowArray* src, + struct ArrowArray* dst) { + NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(dst, NANOARROW_TYPE_UNINITIALIZED)); + + // Allocate children and copy source children to dst children + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(dst, src->n_children)); + for (int64_t i = 0; i < src->n_children; i++) { + NANOARROW_RETURN_NOT_OK( + ArrowArrayMoveSharedInternal(src->children[i], dst->children[i])); + } + + // Allocate dictionary if needed and move source dictionary to dst dictionary + if (src->dictionary != NULL) { + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateDictionary(dst)); + NANOARROW_RETURN_NOT_OK(ArrowArrayMoveSharedInternal(src, dst)); + } + + // We might need some more buffers if we are shallowly copying a string/binary view + if (src->n_buffers > 3) { + if (src->n_buffers > INT_MAX) { + return EINVAL; + } + + NANOARROW_RETURN_NOT_OK( + ArrowArrayAddVariadicBuffers(dst, (int32_t)src->n_buffers - 3)); + } + + for (int64_t i = 0; i < src->n_buffers; i++) { + struct ArrowBuffer* src_buffer = ArrowArrayBuffer(src, i); + struct ArrowBuffer* dst_buffer = ArrowArrayBuffer(dst, i); + NANOARROW_RETURN_NOT_OK(ArrowSharedBufferClone(src_buffer, dst_buffer)); + } + + return NANOARROW_OK; +} + +ArrowErrorCode ArrowArrayMoveShared(struct ArrowArray* array, struct ArrowArray* shared) { + struct ArrowArray tmp; + tmp.release = NULL; + ArrowErrorCode result = ArrowArrayMoveSharedInternal(array, shared); + if (result != NANOARROW_OK && tmp.release != NULL) { + ArrowArrayRelease(&tmp); + } + + return result; +} + +ArrowErrorCode ArrowArrayCloneShared(struct ArrowArray* shared, + struct ArrowArray* array) { + if (!ArrowArrayIsShared(shared)) { + return EINVAL; + } + + struct ArrowArray tmp; + tmp.release = NULL; + ArrowErrorCode result = ArrowArrayCloneSharedInternal(shared, array); + if (result != NANOARROW_OK && tmp.release != NULL) { + ArrowArrayRelease(&tmp); + } + + return result; +} + void ArrowArrayViewInitFromType(struct ArrowArrayView* array_view, enum ArrowType storage_type) { memset(array_view, 0, sizeof(struct ArrowArrayView)); diff --git a/src/nanoarrow/common/utils.c b/src/nanoarrow/common/utils.c index c6943a0ac..976a509bf 100644 --- a/src/nanoarrow/common/utils.c +++ b/src/nanoarrow/common/utils.c @@ -427,6 +427,11 @@ ArrowErrorCode ArrowSharedBufferInit(struct ArrowBuffer* shared, return NANOARROW_OK; } +int ArrowIsSharedBuffer(struct ArrowBuffer* buffer) { + return buffer->allocator.free == &ArrowSharedBufferFree || + buffer->allocator.free == &ArrowSharedArrayBufferFree; +} + ArrowErrorCode ArrowSharedBufferClone(struct ArrowBuffer* shared, struct ArrowBuffer* shared_out) { if (shared->size_bytes == 0) { @@ -486,8 +491,8 @@ void ArrowSharedArrayRelease(struct ArrowSharedArray* shared) { shared->private_data = NULL; } -ArrowErrorCode ArrowSharedArrayBuffer(struct ArrowSharedArray* shared, - int64_t i, struct ArrowBuffer* out) { +ArrowErrorCode ArrowSharedArrayBuffer(struct ArrowSharedArray* shared, int64_t i, + struct ArrowBuffer* out) { struct ArrowSharedArrayPrivate* private_data = (struct ArrowSharedArrayPrivate*)shared->private_data; NANOARROW_DCHECK(i >= 0 && i < private_data->src.n_buffers); @@ -512,8 +517,6 @@ ArrowErrorCode ArrowSharedArrayBuffer(struct ArrowSharedArray* shared, return NANOARROW_OK; } - - static const int kInt32DecimalDigits = 9; static const uint64_t kUInt32PowersOfTen[] = { diff --git a/src/nanoarrow/nanoarrow.h b/src/nanoarrow/nanoarrow.h index 43d948bbd..1ec13876d 100644 --- a/src/nanoarrow/nanoarrow.h +++ b/src/nanoarrow/nanoarrow.h @@ -55,6 +55,7 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedArrayRelease) #define ArrowSharedArrayBuffer \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowSharedArrayBuffer) +#define ArrowIsSharedBuffer NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIsSharedBuffer) #define ArrowErrorSet NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowErrorSet) #define ArrowLayoutInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowLayoutInit) #define ArrowDecimalSetDigits NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDecimalSetDigits) @@ -125,6 +126,8 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayFinishBuilding) #define ArrowArrayFinishBuildingDefault \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayFinishBuildingDefault) +#define ArrowArrayMoveShared NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayMoveShared) +#define ArrowArrayCloneShared NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayCloneShared) #define ArrowArrayViewInitFromType \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayViewInitFromType) #define ArrowArrayViewInitFromSchema \ @@ -245,12 +248,17 @@ NANOARROW_DLL ArrowErrorCode ArrowSharedBufferInit(struct ArrowBuffer* shared, /// the resulting arrays must not be passed to other threads to be released. NANOARROW_DLL int ArrowSharedBufferIsThreadSafe(void); +/// \brief Check if a buffer is a shared buffer +/// +/// Returns non-zero if buffer was created by ArrowSharedBufferInit() or +/// obtained from ArrowSharedArrayBuffer(). +NANOARROW_DLL int ArrowIsSharedBuffer(struct ArrowBuffer* buffer); + /// \brief Clone a shared buffer, incrementing its reference count /// /// This creates a new ArrowBuffer that shares the same underlying data with the /// original shared buffer. The reference count is incremented. Returns EINVAL -/// if shared is not a shared buffer created by ArrowSharedBufferInit() or -/// obtained from ArrowSharedArrayBuffer(). +/// if shared is not a shared buffer (i.e. ArrowIsSharedBuffer() returns false). NANOARROW_DLL ArrowErrorCode ArrowSharedBufferClone(struct ArrowBuffer* shared, struct ArrowBuffer* shared_out); @@ -1140,6 +1148,25 @@ NANOARROW_DLL ArrowErrorCode ArrowArrayFinishBuilding( struct ArrowArray* array, enum ArrowValidationLevel validation_level, struct ArrowError* error); +/// \brief Create a shared copy of an ArrowArray +/// +/// Recursively converts array into a version where all buffers are +/// reference-counted. On success, shared is a new ArrowArray whose buffers +/// are backed by ArrowSharedArray references, and array is consumed +/// (release set to NULL). The resulting shared array can be safely moved +/// or have its buffers cloned via ArrowSharedBufferClone(). +NANOARROW_DLL ArrowErrorCode ArrowArrayMoveShared(struct ArrowArray* array, + struct ArrowArray* shared); + +/// \brief Clone a shared ArrowArray +/// +/// Creates a new ArrowArray whose buffers share the same underlying data as +/// the source shared array. The source must have been created by +/// ArrowArrayMoveShared() (i.e. all buffers are reference-counted). +/// Returns EINVAL if shared is not a shared array. +NANOARROW_DLL ArrowErrorCode ArrowArrayCloneShared(struct ArrowArray* shared, + struct ArrowArray* array); + /// @} /// \defgroup nanoarrow-array-view Reading arrays From aa69cdab405c2cdf1bfa8e2e5f472dd180ceb89c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 6 Apr 2026 00:56:51 -0500 Subject: [PATCH 07/10] use shared array thinger in r bindings --- r/src/altrep.c | 7 +- r/src/array.h | 130 +++++++++-------------------------- src/nanoarrow/common/array.c | 23 ++++++- 3 files changed, 57 insertions(+), 103 deletions(-) diff --git a/r/src/altrep.c b/r/src/altrep.c index 847951b1c..2d0be3967 100644 --- a/r/src/altrep.c +++ b/r/src/altrep.c @@ -168,17 +168,16 @@ SEXP nanoarrow_c_make_altrep_chr(SEXP array_xptr) { // Ensure the array that we're attaching to this ALTREP object does not keep its // parent struct alive unnecessarily (i.e., a user can select only a few columns // and the memory for the unused columns will be released). - SEXP array_xptr_independent = PROTECT(array_xptr_ensure_independent(array_xptr)); + array_xptr_ensure_independent(array_xptr); - if (nanoarrow_converter_set_array(converter_xptr, array_xptr_independent) != - NANOARROW_OK) { + if (nanoarrow_converter_set_array(converter_xptr, array_xptr) != NANOARROW_OK) { nanoarrow_converter_stop(converter_xptr); } Rf_setAttrib(converter_xptr, R_ClassSymbol, nanoarrow_cls_altrep_chr); SEXP out = PROTECT(R_new_altrep(nanoarrow_altrep_chr_cls, converter_xptr, R_NilValue)); MARK_NOT_MUTABLE(out); - UNPROTECT(3); + UNPROTECT(2); return out; } diff --git a/r/src/array.h b/r/src/array.h index 96c2849a9..691c39d83 100644 --- a/r/src/array.h +++ b/r/src/array.h @@ -24,7 +24,6 @@ #include #include "buffer.h" #include "nanoarrow.h" -#include "util.h" // Returns an external pointer to an array child with a schema attached. // The returned pointer will keep its parent alive unless passed through @@ -65,86 +64,6 @@ static inline struct ArrowSchema* schema_from_array_xptr(SEXP array_xptr) { } } -static inline SEXP array_ensure_independent(struct ArrowArray* array); -static inline SEXP array_xptr_ensure_independent(SEXP array_xptr); - -// Exports a version of the array pointed to by array_xptr to array_copy -// such that (1) any R references to array_xptr are not invalidated if they exist -// and (2) array_copy->release() can be called independently without invalidating -// R references to array_xptr. This is a recursive operation (i.e., it will -// "explode" the array's children into reference-counted entities where the -// reference counting is handled by R's preserve/release infrastructure). -// Exported arrays and their children have the important property that they -// (and their children) are allocated using nanoarrow's ArrowArrayInit, meaning -// we can modify them safely (i.e., using ArrowArraySetBuffer()). -static inline void array_export(SEXP array_xptr, struct ArrowArray* array_copy) { - // If array_xptr has SEXP dependencies (most commonly this would occur if it's - // a borrowed child of a struct array), this will ensure a version that can be - // released independently of its parent. - SEXP independent_array_xptr = PROTECT(array_xptr_ensure_independent(array_xptr)); - struct ArrowArray* array = nanoarrow_array_from_xptr(independent_array_xptr); - - int result = ArrowArrayInitFromType(array_copy, NANOARROW_TYPE_UNINITIALIZED); - if (result != NANOARROW_OK) { - Rf_error("ArrowArrayInitFromType() failed"); - } - - array_copy->length = array->length; - array_copy->null_count = array->null_count; - array_copy->offset = array->offset; - - // Get buffer references, each of which preserve a reference to independent_array_xptr - - // We might need some more buffers to be allocated for string views - if (array->n_buffers > 3) { - result = ArrowArrayAddVariadicBuffers(array_copy, array->n_buffers - 3); - if (result != NANOARROW_OK) { - Rf_error("ArrowArrayAddVariadicBuffers() failed"); - } - } - - array_copy->n_buffers = array->n_buffers; - for (int64_t i = 0; i < array->n_buffers; i++) { - SEXP borrowed_buffer = - PROTECT(buffer_borrowed_xptr(array->buffers[i], 0, independent_array_xptr)); - result = ArrowArraySetBuffer(array_copy, i, - (struct ArrowBuffer*)R_ExternalPtrAddr(borrowed_buffer)); - if (result != NANOARROW_OK) { - array_copy->release(array_copy); - Rf_error("ArrowArraySetBuffer() failed"); - } - UNPROTECT(1); - } - - // Swap out any children for independently releasable children and export them - // into array_copy->children - result = ArrowArrayAllocateChildren(array_copy, array->n_children); - if (result != NANOARROW_OK) { - array_copy->release(array_copy); - Rf_error("ArrowArrayAllocateChildren() failed"); - } - - for (int64_t i = 0; i < array->n_children; i++) { - SEXP independent_child = PROTECT(array_ensure_independent(array->children[i])); - array_export(independent_child, array_copy->children[i]); - UNPROTECT(1); - } - - if (array->dictionary != NULL) { - result = ArrowArrayAllocateDictionary(array_copy); - if (result != NANOARROW_OK) { - array_copy->release(array_copy); - Rf_error("ArrowArrayAllocateDictionary() failed"); - } - - SEXP independent_dictionary = PROTECT(array_ensure_independent(array->dictionary)); - array_export(independent_dictionary, array_copy->dictionary); - UNPROTECT(1); - } - - UNPROTECT(1); -} - // When arrays arrive as a nanoarrow_array, they are responsible for // releasing their children. This is fine until we need to keep one // child alive (e.g., a column of a data frame that we attach to an @@ -155,35 +74,52 @@ static inline void array_export(SEXP array_xptr, struct ArrowArray* array_copy) // give an exported version back to the original object. This only // applies if the array_xptr has the external pointer 'prot' field // set (if it doesn't have that set, it is already independent). -static inline SEXP array_ensure_independent(struct ArrowArray* array) { - SEXP original_array_xptr = PROTECT(nanoarrow_array_owning_xptr()); - - // Move array to the newly created owner - struct ArrowArray* original_array = - nanoarrow_output_array_from_xptr(original_array_xptr); - memcpy(original_array, array, sizeof(struct ArrowArray)); - array->release = NULL; +static inline void array_ensure_independent(struct ArrowArray* array) { + SEXP shelter_xptr = PROTECT(nanoarrow_array_owning_xptr()); + struct ArrowArray* tmp = nanoarrow_output_array_from_xptr(shelter_xptr); - // Export the independent array (which keeps a reference to original_array_xptr) - // back to the original home - array_export(original_array_xptr, array); + ArrowErrorCode result = ArrowArrayMoveShared(array, tmp); + if (result != NANOARROW_OK) { + Rf_error("ArrowArrayMoveShared() failed"); + } - // Return the external pointer of the independent array + ArrowArrayMove(tmp, array); UNPROTECT(1); - return original_array_xptr; } // This version is like the version that operates on a raw struct ArrowArray* // except it checks if this array has any array dependencies by inspecing the 'Protected' // field of the external pointer: if it that field is R_NilValue, it is already // independent. -static inline SEXP array_xptr_ensure_independent(SEXP array_xptr) { +static inline void array_xptr_ensure_independent(SEXP array_xptr) { struct ArrowArray* array = nanoarrow_array_from_xptr(array_xptr); if (R_ExternalPtrProtected(array_xptr) == R_NilValue) { - return array_xptr; + return; } - return array_ensure_independent(array); + array_ensure_independent(array); +} + +// Exports a version of the array pointed to by array_xptr to array_copy +// such that (1) any R references to array_xptr are not invalidated if they exist +// and (2) array_copy->release() can be called independently without invalidating +// R references to array_xptr. This is a recursive operation (i.e., it will +// "explode" the array's children into reference-counted entities where the +// reference counting is handled by R's preserve/release infrastructure). +// Exported arrays and their children have the important property that they +// (and their children) are allocated using nanoarrow's ArrowArrayInit, meaning +// we can modify them safely (i.e., using ArrowArraySetBuffer()). +static inline void array_export(SEXP array_xptr, struct ArrowArray* array_copy) { + // If array_xptr has SEXP dependencies (most commonly this would occur if it's + // a borrowed child of a struct array), this will ensure a version that can be + // released independently of its parent. + struct ArrowArray* array = nanoarrow_array_from_xptr(array_xptr); + array_ensure_independent(array); + + ArrowErrorCode result = ArrowArrayCloneShared(array, array_copy); + if (result != NANOARROW_OK) { + Rf_error("ArrowArrayCloneShared() failed"); + } } #endif diff --git a/src/nanoarrow/common/array.c b/src/nanoarrow/common/array.c index 2b7e820e0..c8d7bf32c 100644 --- a/src/nanoarrow/common/array.c +++ b/src/nanoarrow/common/array.c @@ -612,7 +612,8 @@ static ArrowErrorCode ArrowArrayMoveSharedInternal(struct ArrowArray* src, // Allocate dictionary if needed and move source dictionary to dst dictionary if (src->dictionary != NULL) { NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateDictionary(dst)); - NANOARROW_RETURN_NOT_OK(ArrowArrayMoveSharedInternal(src, dst)); + NANOARROW_RETURN_NOT_OK( + ArrowArrayMoveSharedInternal(src->dictionary, dst->dictionary)); } // We might need some more buffers if we are shallowly copying a string/binary view @@ -639,6 +640,15 @@ static ArrowErrorCode ArrowArrayMoveSharedInternal(struct ArrowArray* src, } ArrowSharedArrayRelease(&shared_array); + + dst->n_buffers = src->n_buffers; + dst->length = src->length; + dst->null_count = src->null_count; + dst->offset = src->offset; + + // Flush internal buffer pointers to array->buffers + NANOARROW_RETURN_NOT_OK(ArrowArrayFlushInternalPointers(dst)); + return NANOARROW_OK; } @@ -656,7 +666,8 @@ static ArrowErrorCode ArrowArrayCloneSharedInternal(struct ArrowArray* src, // Allocate dictionary if needed and move source dictionary to dst dictionary if (src->dictionary != NULL) { NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateDictionary(dst)); - NANOARROW_RETURN_NOT_OK(ArrowArrayMoveSharedInternal(src, dst)); + NANOARROW_RETURN_NOT_OK( + ArrowArrayMoveSharedInternal(src->dictionary, dst->dictionary)); } // We might need some more buffers if we are shallowly copying a string/binary view @@ -675,6 +686,14 @@ static ArrowErrorCode ArrowArrayCloneSharedInternal(struct ArrowArray* src, NANOARROW_RETURN_NOT_OK(ArrowSharedBufferClone(src_buffer, dst_buffer)); } + dst->n_buffers = src->n_buffers; + dst->length = src->length; + dst->null_count = src->null_count; + dst->offset = src->offset; + + // Flush internal buffer pointers to array->buffers + NANOARROW_RETURN_NOT_OK(ArrowArrayFlushInternalPointers(dst)); + return NANOARROW_OK; } From fc77888572e6f9ad75546e96da86e81f6f845584 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 6 Apr 2026 01:15:56 -0500 Subject: [PATCH 08/10] fix issues with initial version --- r/src/array.h | 67 +++++++++++++++++++++++++++++++++--- src/nanoarrow/common/array.c | 8 ++--- 2 files changed, 66 insertions(+), 9 deletions(-) diff --git a/r/src/array.h b/r/src/array.h index 691c39d83..57f43a75a 100644 --- a/r/src/array.h +++ b/r/src/array.h @@ -24,6 +24,7 @@ #include #include "buffer.h" #include "nanoarrow.h" +#include "util.h" // Returns an external pointer to an array child with a schema attached. // The returned pointer will keep its parent alive unless passed through @@ -111,14 +112,70 @@ static inline void array_xptr_ensure_independent(SEXP array_xptr) { // we can modify them safely (i.e., using ArrowArraySetBuffer()). static inline void array_export(SEXP array_xptr, struct ArrowArray* array_copy) { // If array_xptr has SEXP dependencies (most commonly this would occur if it's - // a borrowed child of a struct array), this will ensure a version that can be - // released independently of its parent. + // a borrowed child of a struct array), ensure a version that can be released + // independently. For non-borrowed arrays this is a no-op. + array_xptr_ensure_independent(array_xptr); struct ArrowArray* array = nanoarrow_array_from_xptr(array_xptr); - array_ensure_independent(array); - ArrowErrorCode result = ArrowArrayCloneShared(array, array_copy); + int result = ArrowArrayInitFromType(array_copy, NANOARROW_TYPE_UNINITIALIZED); if (result != NANOARROW_OK) { - Rf_error("ArrowArrayCloneShared() failed"); + Rf_error("ArrowArrayInitFromType() failed"); + } + + array_copy->length = array->length; + array_copy->null_count = array->null_count; + array_copy->offset = array->offset; + + // We might need some more buffers to be allocated for string views + if (array->n_buffers > 3) { + result = ArrowArrayAddVariadicBuffers(array_copy, array->n_buffers - 3); + if (result != NANOARROW_OK) { + Rf_error("ArrowArrayAddVariadicBuffers() failed"); + } + } + + // Get buffer references, each of which preserve a reference to array_xptr + array_copy->n_buffers = array->n_buffers; + for (int64_t i = 0; i < array->n_buffers; i++) { + SEXP borrowed_buffer = + PROTECT(buffer_borrowed_xptr(array->buffers[i], 0, array_xptr)); + result = ArrowArraySetBuffer(array_copy, i, + (struct ArrowBuffer*)R_ExternalPtrAddr(borrowed_buffer)); + if (result != NANOARROW_OK) { + array_copy->release(array_copy); + Rf_error("ArrowArraySetBuffer() failed"); + } + UNPROTECT(1); + } + + // Swap out any children for independently releasable children and export them + // into array_copy->children + result = ArrowArrayAllocateChildren(array_copy, array->n_children); + if (result != NANOARROW_OK) { + array_copy->release(array_copy); + Rf_error("ArrowArrayAllocateChildren() failed"); + } + + for (int64_t i = 0; i < array->n_children; i++) { + array_ensure_independent(array->children[i]); + SEXP child_xptr = PROTECT(borrow_array_child_xptr(array_xptr, i)); + array_export(child_xptr, array_copy->children[i]); + UNPROTECT(1); + } + + if (array->dictionary != NULL) { + result = ArrowArrayAllocateDictionary(array_copy); + if (result != NANOARROW_OK) { + array_copy->release(array_copy); + Rf_error("ArrowArrayAllocateDictionary() failed"); + } + + array_ensure_independent(array->dictionary); + // Create a temporary xptr for the dictionary to recurse + SEXP dict_xptr = PROTECT(R_MakeExternalPtr(array->dictionary, R_NilValue, array_xptr)); + Rf_setAttrib(dict_xptr, R_ClassSymbol, nanoarrow_cls_array); + array_export(dict_xptr, array_copy->dictionary); + UNPROTECT(1); } } diff --git a/src/nanoarrow/common/array.c b/src/nanoarrow/common/array.c index c8d7bf32c..7184c827a 100644 --- a/src/nanoarrow/common/array.c +++ b/src/nanoarrow/common/array.c @@ -656,18 +656,18 @@ static ArrowErrorCode ArrowArrayCloneSharedInternal(struct ArrowArray* src, struct ArrowArray* dst) { NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(dst, NANOARROW_TYPE_UNINITIALIZED)); - // Allocate children and copy source children to dst children + // Allocate children and clone source children to dst children NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(dst, src->n_children)); for (int64_t i = 0; i < src->n_children; i++) { NANOARROW_RETURN_NOT_OK( - ArrowArrayMoveSharedInternal(src->children[i], dst->children[i])); + ArrowArrayCloneSharedInternal(src->children[i], dst->children[i])); } - // Allocate dictionary if needed and move source dictionary to dst dictionary + // Allocate dictionary if needed and clone source dictionary to dst dictionary if (src->dictionary != NULL) { NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateDictionary(dst)); NANOARROW_RETURN_NOT_OK( - ArrowArrayMoveSharedInternal(src->dictionary, dst->dictionary)); + ArrowArrayCloneSharedInternal(src->dictionary, dst->dictionary)); } // We might need some more buffers if we are shallowly copying a string/binary view From ad486e6a60be0b1461df7158937ea30b2a9dead0 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 6 Apr 2026 09:00:31 -0500 Subject: [PATCH 09/10] revert change --- r/src/array.h | 105 +++++++++++++++++++++++++++----------------------- 1 file changed, 56 insertions(+), 49 deletions(-) diff --git a/r/src/array.h b/r/src/array.h index 57f43a75a..96c2849a9 100644 --- a/r/src/array.h +++ b/r/src/array.h @@ -65,41 +65,8 @@ static inline struct ArrowSchema* schema_from_array_xptr(SEXP array_xptr) { } } -// When arrays arrive as a nanoarrow_array, they are responsible for -// releasing their children. This is fine until we need to keep one -// child alive (e.g., a column of a data frame that we attach to an -// ALTREP array) or until we need to export it (i.e., comply with -// https://arrow.apache.org/docs/format/CDataInterface.html#moving-child-arrays -// where child arrays must be movable). To make this work we need to do a shuffle: we -// move the child array to a new owning external pointer and -// give an exported version back to the original object. This only -// applies if the array_xptr has the external pointer 'prot' field -// set (if it doesn't have that set, it is already independent). -static inline void array_ensure_independent(struct ArrowArray* array) { - SEXP shelter_xptr = PROTECT(nanoarrow_array_owning_xptr()); - struct ArrowArray* tmp = nanoarrow_output_array_from_xptr(shelter_xptr); - - ArrowErrorCode result = ArrowArrayMoveShared(array, tmp); - if (result != NANOARROW_OK) { - Rf_error("ArrowArrayMoveShared() failed"); - } - - ArrowArrayMove(tmp, array); - UNPROTECT(1); -} - -// This version is like the version that operates on a raw struct ArrowArray* -// except it checks if this array has any array dependencies by inspecing the 'Protected' -// field of the external pointer: if it that field is R_NilValue, it is already -// independent. -static inline void array_xptr_ensure_independent(SEXP array_xptr) { - struct ArrowArray* array = nanoarrow_array_from_xptr(array_xptr); - if (R_ExternalPtrProtected(array_xptr) == R_NilValue) { - return; - } - - array_ensure_independent(array); -} +static inline SEXP array_ensure_independent(struct ArrowArray* array); +static inline SEXP array_xptr_ensure_independent(SEXP array_xptr); // Exports a version of the array pointed to by array_xptr to array_copy // such that (1) any R references to array_xptr are not invalidated if they exist @@ -112,10 +79,10 @@ static inline void array_xptr_ensure_independent(SEXP array_xptr) { // we can modify them safely (i.e., using ArrowArraySetBuffer()). static inline void array_export(SEXP array_xptr, struct ArrowArray* array_copy) { // If array_xptr has SEXP dependencies (most commonly this would occur if it's - // a borrowed child of a struct array), ensure a version that can be released - // independently. For non-borrowed arrays this is a no-op. - array_xptr_ensure_independent(array_xptr); - struct ArrowArray* array = nanoarrow_array_from_xptr(array_xptr); + // a borrowed child of a struct array), this will ensure a version that can be + // released independently of its parent. + SEXP independent_array_xptr = PROTECT(array_xptr_ensure_independent(array_xptr)); + struct ArrowArray* array = nanoarrow_array_from_xptr(independent_array_xptr); int result = ArrowArrayInitFromType(array_copy, NANOARROW_TYPE_UNINITIALIZED); if (result != NANOARROW_OK) { @@ -126,6 +93,8 @@ static inline void array_export(SEXP array_xptr, struct ArrowArray* array_copy) array_copy->null_count = array->null_count; array_copy->offset = array->offset; + // Get buffer references, each of which preserve a reference to independent_array_xptr + // We might need some more buffers to be allocated for string views if (array->n_buffers > 3) { result = ArrowArrayAddVariadicBuffers(array_copy, array->n_buffers - 3); @@ -134,11 +103,10 @@ static inline void array_export(SEXP array_xptr, struct ArrowArray* array_copy) } } - // Get buffer references, each of which preserve a reference to array_xptr array_copy->n_buffers = array->n_buffers; for (int64_t i = 0; i < array->n_buffers; i++) { SEXP borrowed_buffer = - PROTECT(buffer_borrowed_xptr(array->buffers[i], 0, array_xptr)); + PROTECT(buffer_borrowed_xptr(array->buffers[i], 0, independent_array_xptr)); result = ArrowArraySetBuffer(array_copy, i, (struct ArrowBuffer*)R_ExternalPtrAddr(borrowed_buffer)); if (result != NANOARROW_OK) { @@ -157,9 +125,8 @@ static inline void array_export(SEXP array_xptr, struct ArrowArray* array_copy) } for (int64_t i = 0; i < array->n_children; i++) { - array_ensure_independent(array->children[i]); - SEXP child_xptr = PROTECT(borrow_array_child_xptr(array_xptr, i)); - array_export(child_xptr, array_copy->children[i]); + SEXP independent_child = PROTECT(array_ensure_independent(array->children[i])); + array_export(independent_child, array_copy->children[i]); UNPROTECT(1); } @@ -170,13 +137,53 @@ static inline void array_export(SEXP array_xptr, struct ArrowArray* array_copy) Rf_error("ArrowArrayAllocateDictionary() failed"); } - array_ensure_independent(array->dictionary); - // Create a temporary xptr for the dictionary to recurse - SEXP dict_xptr = PROTECT(R_MakeExternalPtr(array->dictionary, R_NilValue, array_xptr)); - Rf_setAttrib(dict_xptr, R_ClassSymbol, nanoarrow_cls_array); - array_export(dict_xptr, array_copy->dictionary); + SEXP independent_dictionary = PROTECT(array_ensure_independent(array->dictionary)); + array_export(independent_dictionary, array_copy->dictionary); UNPROTECT(1); } + + UNPROTECT(1); +} + +// When arrays arrive as a nanoarrow_array, they are responsible for +// releasing their children. This is fine until we need to keep one +// child alive (e.g., a column of a data frame that we attach to an +// ALTREP array) or until we need to export it (i.e., comply with +// https://arrow.apache.org/docs/format/CDataInterface.html#moving-child-arrays +// where child arrays must be movable). To make this work we need to do a shuffle: we +// move the child array to a new owning external pointer and +// give an exported version back to the original object. This only +// applies if the array_xptr has the external pointer 'prot' field +// set (if it doesn't have that set, it is already independent). +static inline SEXP array_ensure_independent(struct ArrowArray* array) { + SEXP original_array_xptr = PROTECT(nanoarrow_array_owning_xptr()); + + // Move array to the newly created owner + struct ArrowArray* original_array = + nanoarrow_output_array_from_xptr(original_array_xptr); + memcpy(original_array, array, sizeof(struct ArrowArray)); + array->release = NULL; + + // Export the independent array (which keeps a reference to original_array_xptr) + // back to the original home + array_export(original_array_xptr, array); + + // Return the external pointer of the independent array + UNPROTECT(1); + return original_array_xptr; +} + +// This version is like the version that operates on a raw struct ArrowArray* +// except it checks if this array has any array dependencies by inspecing the 'Protected' +// field of the external pointer: if it that field is R_NilValue, it is already +// independent. +static inline SEXP array_xptr_ensure_independent(SEXP array_xptr) { + struct ArrowArray* array = nanoarrow_array_from_xptr(array_xptr); + if (R_ExternalPtrProtected(array_xptr) == R_NilValue) { + return array_xptr; + } + + return array_ensure_independent(array); } #endif From a49c349bab756d3566031ef3ecdcd4541701e49c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sat, 2 May 2026 22:01:34 -0500 Subject: [PATCH 10/10] rebase fixes Co-authored-by: Copilot --- src/nanoarrow/ipc/decoder.c | 6 +++--- src/nanoarrow/ipc/decoder_test.cc | 14 +++++++------- src/nanoarrow/ipc/reader.c | 6 +++--- src/nanoarrow/nanoarrow_ipc.h | 10 +++++----- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c index f266c740b..ef709b2a6 100644 --- a/src/nanoarrow/ipc/decoder.c +++ b/src/nanoarrow/ipc/decoder.c @@ -2086,7 +2086,7 @@ static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromShared( struct ArrowIpcBufferFactory out; out.make_buffer = &ArrowIpcMakeBufferFromShared; out.decompressor = NULL; - out.buffer_length = shared->private_src.size_bytes; + out.buffer_length = shared->size_bytes; out.private_data = shared; return out; } @@ -2568,7 +2568,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder, } ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* body, int64_t i, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* body, int64_t i, struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error) { struct ArrowArrayView* array_view; @@ -2679,7 +2679,7 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary( } NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionaryFromShared( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* shared, enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries* dictionaries, struct ArrowError* error) { NANOARROW_DCHECK(decoder != NULL); diff --git a/src/nanoarrow/ipc/decoder_test.cc b/src/nanoarrow/ipc/decoder_test.cc index 694310a9c..f1c8164a2 100644 --- a/src/nanoarrow/ipc/decoder_test.cc +++ b/src/nanoarrow/ipc/decoder_test.cc @@ -853,8 +853,8 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatch) { ArrowBufferInit(&record_batch_body); ASSERT_EQ(ArrowBufferAppendBufferView(&record_batch_body, data), NANOARROW_OK); - struct ArrowIpcSharedBuffer record_batch_shared; - ASSERT_EQ(ArrowIpcSharedBufferInit(&record_batch_shared, &record_batch_body), + struct ArrowBuffer record_batch_shared; + ASSERT_EQ(ArrowSharedBufferInit(&record_batch_shared, &record_batch_body), NANOARROW_OK); ASSERT_EQ(ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( @@ -865,7 +865,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatch) { ASSERT_NE(batch.children[0]->dictionary, nullptr); ASSERT_EQ(batch.children[0]->dictionary->length, 3); ArrowArrayRelease(&batch); - ArrowIpcSharedBufferReset(&record_batch_shared); + ArrowBufferReset(&record_batch_shared); ArrowArrayViewReset(&array_view); ArrowIpcDictionariesReset(&dictionaries); @@ -971,7 +971,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) { NANOARROW_OK); struct ArrowBuffer shared; - ASSERT_EQ(ArrowIpcSharedBufferInit(&shared, &body), NANOARROW_OK); + ASSERT_EQ(ArrowSharedBufferInit(&shared, &body), NANOARROW_OK); // Check full struct extract EXPECT_EQ(ArrowIpcDecoderDecodeArrayFromShared( @@ -1010,8 +1010,8 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) { } TEST(NanoarrowIpcTest, NanoarrowIpcSharedBufferThreadSafeDecode) { - if (!ArrowIpcSharedBufferIsThreadSafe()) { - GTEST_SKIP() << "ArrowIpcSharedBufferIsThreadSafe() returned false"; + if (!ArrowSharedBufferIsThreadSafe()) { + GTEST_SKIP() << "ArrowSharedBufferIsThreadSafe() returned false"; } struct ArrowIpcDecoder decoder; @@ -1040,7 +1040,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcSharedBufferThreadSafeDecode) { NANOARROW_OK); struct ArrowBuffer shared; - ASSERT_EQ(ArrowIpcSharedBufferInit(&shared, &body), NANOARROW_OK); + ASSERT_EQ(ArrowSharedBufferInit(&shared, &body), NANOARROW_OK); struct ArrowArray arrays[10]; for (int i = 0; i < 10; i++) { diff --git a/src/nanoarrow/ipc/reader.c b/src/nanoarrow/ipc/reader.c index 4cb04d799..f69d441f2 100644 --- a/src/nanoarrow/ipc/reader.c +++ b/src/nanoarrow/ipc/reader.c @@ -491,11 +491,11 @@ static int ArrowIpcArrayStreamReaderProcessDictionary( // Decode the dictionary struct ArrowBuffer shared; NANOARROW_RETURN_NOT_OK_WITH_ERROR( - ArrowIpcSharedBufferInit(&shared, &private_data->body), &private_data->error); + ArrowSharedBufferInit(&shared, &private_data->body), &private_data->error); int result = ArrowIpcDecoderDecodeDictionaryFromShared( &private_data->decoder, &shared, NANOARROW_VALIDATION_LEVEL_FULL, &private_data->dictionaries, &private_data->error); - ArrowIpcSharedBufferReset(&shared); + ArrowBufferReset(&shared); NANOARROW_RETURN_NOT_OK(result); } else { struct ArrowBufferView body_view; @@ -603,7 +603,7 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit( private_data->use_shared_buffers = options->use_shared_buffers; } else { private_data->field_index = -1; - private_data->use_shared_buffers = ArrowIpcSharedBufferIsThreadSafe(); + private_data->use_shared_buffers = ArrowSharedBufferIsThreadSafe(); } out->private_data = private_data; diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h index 8031bd323..3632bd777 100644 --- a/src/nanoarrow/nanoarrow_ipc.h +++ b/src/nanoarrow/nanoarrow_ipc.h @@ -644,7 +644,7 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArray( /// ArrowSharedBufferIsThreadSafe() returns 0, out must not be released by another /// thread. NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeArrayFromSharedWithDictionaries( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, int64_t i, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* shared, int64_t i, struct ArrowIpcDictionaries* dictionaries, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error); @@ -673,12 +673,12 @@ NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionary( /// \brief Decode an ArrowArray from a dictionary batch from an owned buffer /// /// This implementation takes advantage of the fact that it can avoid copying individual -/// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body after one or +/// buffers. In all cases the caller must ArrowBufferReset() body after one or /// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If -/// ArrowIpcSharedBufferIsThreadSafe() returns 0, no batches decoded using out may +/// ArrowSharedBufferIsThreadSafe() returns 0, no batches decoded using out may /// be released from another thread. NANOARROW_DLL ArrowErrorCode ArrowIpcDecoderDecodeDictionaryFromShared( - struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* shared, + struct ArrowIpcDecoder* decoder, struct ArrowBuffer* shared, enum ArrowValidationLevel validation_level, struct ArrowIpcDictionaries* out, struct ArrowError* error); @@ -735,7 +735,7 @@ struct ArrowIpcArrayStreamReaderOptions { /// (since unreferenced portions of the file are often not loaded into memory) or /// (2) if all data from all columns are about to be referenced anyway. When loading /// a single field there is probably no advantage to using shared buffers. - /// Defaults to the value of ArrowIpcSharedBufferIsThreadSafe(). + /// Defaults to the value of ArrowSharedBufferIsThreadSafe(). int use_shared_buffers; };