Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ option(MRC_USE_CONDA "Enables finding dependencies via conda. All dependencies m
environment" ON)
option(MRC_USE_IWYU "Enable running include-what-you-use as part of the build process" OFF)

set(MRC_RAPIDS_VERSION "23.12" CACHE STRING "Which version of RAPIDS to build for. Sets default versions for RAPIDS CMake and RMM.")
set(MRC_RAPIDS_VERSION "24.04" CACHE STRING "Which version of RAPIDS to build for. Sets default versions for RAPIDS CMake and RMM.")

set(MRC_CACHE_DIR "${CMAKE_SOURCE_DIR}/.cache" CACHE PATH "Directory to contain all CPM and CCache data")
mark_as_advanced(MRC_CACHE_DIR)
Expand Down Expand Up @@ -132,12 +132,15 @@ set(CMAKE_CXX_EXTENSIONS ON)
set(CMAKE_POSITION_INDEPENDENT_CODE TRUE)
set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)

# add_compile_options("$<$<COMPILE_LANGUAGE:CXX>:-fsanitize=address>")
# add_compile_options("$<$<COMPILE_LANGUAGE:CXX>:-fsanitize=address>")

# Setup cache before dependencies
# Configure CCache if requested
include(environment/init_ccache)

# Disable exporting compile commands for dependencies
set(CMAKE_EXPORT_COMPILE_COMMANDS OFF)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

# Create a custom target to allow preparing for style checks
add_custom_target(${PROJECT_NAME}_style_checks
Expand Down
14 changes: 6 additions & 8 deletions cmake/Configure_UCXX.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,26 @@

include_guard(GLOBAL)

find_package(ucx REQUIRED)

function(morpheus_utils_configure_UCXX)
list(APPEND CMAKE_MESSAGE_CONTEXT "UCXX")

morpheus_utils_assert_cpm_initialized()
set(UCXX_VERSION "0.37.00" CACHE STRING "Which version of UCXX to use.")
set(UCXX_VERSION "0.39.00" CACHE STRING "Which version of UCXX to use.")

find_package(ucx REQUIRED)

# TODO(MDD): Switch back to the official UCXX repo once the following PR is merged:
# https://github.com/rapidsai/ucxx/pull/166
rapids_cpm_find(ucxx ${UCXX_VERSION}
GLOBAL_TARGETS
ucxx ucxx::ucxx ucx::ucs ucx::ucp
ucxx::ucxx ucxx::python
BUILD_EXPORT_SET
${PROJECT_NAME}-core-exports
INSTALL_EXPORT_SET
${PROJECT_NAME}-core-exports
CPM_ARGS
GIT_REPOSITORY https://github.com/pentschev/ucxx.git
GIT_TAG mrc-all
GIT_REPOSITORY https://github.com/rapidsai/ucxx.git
GIT_TAG branch-0.39
GIT_SHALLOW TRUE
SOURCE_SUBDIR cpp
OPTIONS "UCXX_ENABLE_RMM ON"
Expand All @@ -46,5 +46,3 @@ function(morpheus_utils_configure_UCXX)
list(POP_BACK CMAKE_MESSAGE_CONTEXT)

endfunction()

message(STATUS "Loaded UCXX Configuration functions")
4 changes: 2 additions & 2 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dependencies:
- libclang=17
- libgrpc=1.54.0
- libhwloc=2.9.2
- librmm=23.12
- librmm=24.04
- llvmdev=17
- ninja=1.10
- nlohmann_json=3.9
Expand All @@ -53,6 +53,6 @@ dependencies:
- pytest-timeout
- python=3.10
- scikit-build>=0.17
- ucx=1.14
- ucx>=1.15
- yapf
name: all_cuda-118_arch-x86_64
4 changes: 2 additions & 2 deletions conda/environments/ci_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies:
- include-what-you-use=0.21
- libgrpc=1.54.0
- libhwloc=2.9.2
- librmm=23.12
- librmm=24.04
- ninja=1.10
- nlohmann_json=3.9
- nodejs=18
Expand All @@ -43,6 +43,6 @@ dependencies:
- pytest-timeout
- python=3.10
- scikit-build>=0.17
- ucx=1.14
- ucx>=1.15
- yapf
name: ci_cuda-118_arch-x86_64
2 changes: 1 addition & 1 deletion cpp/mrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ target_link_libraries(libmrc
gRPC::grpc++
gRPC::grpc
gRPC::gpr
ucxx
ucxx::ucxx
PRIVATE
hwloc::hwloc
prometheus-cpp::core # private in MR !199
Expand Down
2 changes: 1 addition & 1 deletion cpp/mrc/include/mrc/codable/encode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ std::unique_ptr<LocalSerializedWrapper> encode2(const T& obj,

auto encoded_string = encoded_object->proto().DebugString();

VLOG(10) << "Encoded object proto: \n" << encoded_string;
VLOG(20) << "Encoded object proto: \n" << encoded_string;

return encoded_object;
}
Expand Down
127 changes: 127 additions & 0 deletions cpp/mrc/include/mrc/codable/fundamental_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include "mrc/codable/encode.hpp"
#include "mrc/codable/encoding_options.hpp"
#include "mrc/codable/types.hpp"
#include "mrc/memory/literals.hpp"
#include "mrc/memory/memory_kind.hpp"
#include "mrc/utils/tuple_utils.hpp"

#include <type_traits>
#include <typeindex>
Expand Down Expand Up @@ -135,4 +137,129 @@ struct codable_protocol<T, std::enable_if_t<std::is_same_v<T, std::string>>>
}
};

template <typename T>
struct codable_protocol<std::vector<T>>
{
static void serialize(const std::vector<T>& obj,
mrc::codable::Encoder<std::vector<T>>& encoder,
const mrc::codable::EncodingOptions& opts)
{
// First put in the size
mrc::codable::encode2(obj.size(), encoder, opts);

// Now encode each object
for (const auto& o : obj)
{
mrc::codable::encode2(o, encoder, opts);
}
}

static void serialize(const std::vector<T>& obj,
mrc::codable::Encoder2<std::vector<T>>& encoder,
const mrc::codable::EncodingOptions& opts)
{
using namespace mrc::memory::literals;

// First put in the size
mrc::codable::encode2(obj.size(), encoder, opts);

if constexpr (std::is_fundamental_v<T>)
{
// Since these are fundamental types, just encode in a single memory block
encoder.write_descriptor({obj.data(), obj.size() * sizeof(T), memory::memory_kind::host},
obj.size() * sizeof(T) > 1_MiB ? DescriptorKind::Eager : DescriptorKind::Deferred);
}
else
{
// Now encode each object
for (const auto& o : obj)
{
mrc::codable::encode2(o, encoder, opts);
}
}
}

static std::vector<T> deserialize(const Decoder<std::vector<T>>& decoder, std::size_t object_idx)
{
DCHECK_EQ(std::type_index(typeid(std::vector<T>)).hash_code(), decoder.type_index_hash_for_object(object_idx));

auto count = mrc::codable::decode2<size_t>(decoder, object_idx);

auto object = std::vector<T>(count);

auto idx = decoder.start_idx_for_object(object_idx);
auto bytes = decoder.buffer_size(idx);

decoder.copy_from_buffer(idx, {object.data(), count * sizeof(T), memory::memory_kind::host});

return object;
}

static std::vector<T> deserialize(const Decoder2<std::vector<T>>& decoder, std::size_t object_idx)
{
// DCHECK_EQ(std::type_index(typeid(std::vector<T>)).hash_code(),
// decoder.type_index_hash_for_object(object_idx));

auto count = mrc::codable::decode2<size_t>(decoder, object_idx);

auto object = std::vector<T>(count);

if constexpr (std::is_fundamental_v<T>)
{
decoder.read_descriptor(0, {object.data(), count * sizeof(T), memory::memory_kind::host});
}
else
{
// Now decode each object
for (size_t i = 0; i < count; ++i)
{
object[i] = mrc::codable::decode2<T>(decoder, object_idx);
}
}

return object;
}
};

template <typename... Ts>
struct codable_protocol<std::tuple<Ts...>>
{
static void serialize(const std::tuple<Ts...>& obj,
mrc::codable::Encoder<std::tuple<Ts...>>& encoder,
const mrc::codable::EncodingOptions& opts)
{
mrc::utils::tuple_for_each(obj, [&](const auto& o, std::size_t idx) {
mrc::codable::encode2(o, encoder, opts);
});
}

static void serialize(const std::tuple<Ts...>& obj,
mrc::codable::Encoder2<std::tuple<Ts...>>& encoder,
const mrc::codable::EncodingOptions& opts)
{
mrc::utils::tuple_for_each(obj, [&](const auto& o, std::size_t idx) {
mrc::codable::encode2(o, encoder, opts);
});
}

static std::tuple<Ts...> deserialize(const Decoder<std::tuple<Ts...>>& decoder, std::size_t object_idx)
{
DCHECK_EQ(std::type_index(typeid(std::tuple<Ts...>)).hash_code(),
decoder.type_index_hash_for_object(object_idx));

auto object = std::tuple<Ts...>{mrc::codable::decode2<Ts>(decoder, object_idx)...};

return object;
}

static std::tuple<Ts...> deserialize(const Decoder2<std::tuple<Ts...>>& decoder, std::size_t object_idx)
{
// DCHECK_EQ(std::type_index(typeid(std::vector<T>)).hash_code(),
// decoder.type_index_hash_for_object(object_idx));
auto object = std::tuple<Ts...>{mrc::codable::decode2<Ts>(decoder, object_idx)...};

return object;
}
};

} // namespace mrc::codable
11 changes: 6 additions & 5 deletions cpp/mrc/include/mrc/core/addresses.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,19 @@ extern SegmentAddress segment_address_encode(SegmentID id, SegmentRank rank);
* @param [in] segment_id The ID of the segment
* @returns SegmentAddress
**/
SegmentAddress2 segment_address_encode2(ExecutorID2 exeuctor_id,
PipelineID2 pipeline_id,
SegmentHash2 segment_hash,
SegmentID2 segment_id);
SegmentAddressCombined2 segment_address_encode2(ExecutorID2 exeuctor_id,
PipelineID2 pipeline_id,
SegmentHash2 segment_hash,
SegmentID2 segment_id);

/**
* @brief Decodes a SegmentAddress into a tuple of ExecutorID, PipelineID, SegmentHash, and SegmentID
*
* @param address
* @return std::tuple<ExecutorID2, PipelineID2, SegmentHash2, SegmentID2>
*/
std::tuple<ExecutorID2, PipelineID2, SegmentHash2, SegmentID2> segment_address_decode2(const SegmentAddress2& address);
std::tuple<ExecutorID2, PipelineID2, SegmentHash2, SegmentID2> segment_address_decode2(
const SegmentAddressCombined2& address);

/**
* @brief Decodes a SegmentAddress
Expand Down
Loading