From 02970991a143ae3b98c62b21cebde2b2ad493c5a Mon Sep 17 00:00:00 2001 From: Will Berkeley Date: Tue, 19 Aug 2025 11:14:43 -0700 Subject: [PATCH 1/6] cloud_topics/reconciler: Use L1 io and object primitives This alters the reconciler so it builds and uploads proper L1 objects, using the l1::io and l1::object_builder abstractions. A follow-up will change the commit and LRO offset portions of the reconciler to use the metastore and the ctp_stm. --- src/v/cloud_topics/BUILD | 2 + src/v/cloud_topics/app.cc | 17 +- src/v/cloud_topics/app.h | 2 + src/v/cloud_topics/level_one/common/BUILD | 21 ++ src/v/cloud_topics/reconciler/BUILD | 15 +- .../reconciler/range_batch_consumer.cc | 66 ------ src/v/cloud_topics/reconciler/reconciler.cc | 218 ++++++++++-------- src/v/cloud_topics/reconciler/reconciler.h | 82 +++---- .../reconciler/reconciliation_consumer.cc | 62 +++++ ...h_consumer.h => reconciliation_consumer.h} | 34 +-- src/v/cloud_topics/reconciler/tests/BUILD | 9 +- .../tests/range_batch_consumer_test.cc | 53 ----- .../tests/reconciliation_consumer_test.cc | 84 +++++++ src/v/config/node_config.h | 4 + 14 files changed, 376 insertions(+), 293 deletions(-) delete mode 100644 src/v/cloud_topics/reconciler/range_batch_consumer.cc create mode 100644 src/v/cloud_topics/reconciler/reconciliation_consumer.cc rename src/v/cloud_topics/reconciler/{range_batch_consumer.h => reconciliation_consumer.h} (64%) delete mode 100644 src/v/cloud_topics/reconciler/tests/range_batch_consumer_test.cc create mode 100644 src/v/cloud_topics/reconciler/tests/reconciliation_consumer_test.cc diff --git a/src/v/cloud_topics/BUILD b/src/v/cloud_topics/BUILD index 38092a3c2d304..2df9b53b7800f 100644 --- a/src/v/cloud_topics/BUILD +++ b/src/v/cloud_topics/BUILD @@ -94,11 +94,13 @@ redpanda_cc_library( ":data_plane_api", "//src/v/base", "//src/v/cloud_topics:state_accessors", + "//src/v/cloud_topics/level_one/common:file_io", "//src/v/cloud_topics/level_one/domain:domain_supervisor", "//src/v/cloud_topics/level_one/metastore:frontend", "//src/v/cloud_topics/level_zero/common:extent_meta", "//src/v/cloud_topics/reconciler", "//src/v/cluster", + "//src/v/config", "//src/v/container:chunked_vector", "//src/v/model", "//src/v/ssx:sharded_service_container", diff --git a/src/v/cloud_topics/app.cc b/src/v/cloud_topics/app.cc index 486b5d132497c..ba2c83e31aab7 100644 --- a/src/v/cloud_topics/app.cc +++ b/src/v/cloud_topics/app.cc @@ -15,6 +15,7 @@ #include "cloud_topics/data_plane_impl.h" #include "cluster/cluster_epoch_service.h" #include "cluster/controller.h" +#include "config/node_config.h" #include "ssx/sharded_service_container.h" #include @@ -49,8 +50,22 @@ ss::future<> app::construct( co_await construct_service(state, data_plane.get()); + // Ensure the L1 staging directory exists before creating the l1_io service + co_await ss::recursive_touch_directory( + config::node().l1_staging_path().string()); + + co_await construct_service( + l1_io, + config::node().l1_staging_path(), + ss::sharded_parameter([&remote] { return &remote->local(); }), + bucket, + ss::sharded_parameter([&cloud_cache] { return &cloud_cache->local(); })); + co_await construct_service( - reconciler, partition_mgr, remote, data_plane.get(), bucket); + reconciler, + partition_mgr, + data_plane.get(), + ss::sharded_parameter([this] { return &l1_io.local(); })); co_await construct_service(domain_supervisor, controller); co_await construct_service( l1_metastore_fe, diff --git a/src/v/cloud_topics/app.h b/src/v/cloud_topics/app.h index 7cfceef19f118..9096fc9b62fd1 100644 --- a/src/v/cloud_topics/app.h +++ b/src/v/cloud_topics/app.h @@ -10,6 +10,7 @@ #pragma once +#include "cloud_topics/level_one/common/file_io.h" #include "cloud_topics/level_one/domain/domain_supervisor.h" #include "cloud_topics/level_one/metastore/frontend.h" #include "cloud_topics/reconciler/reconciler.h" @@ -71,6 +72,7 @@ class app : public ssx::sharded_service_container { ss::sstring _logger_name; std::unique_ptr data_plane; ss::sharded state; + ss::sharded l1_io; ss::sharded reconciler; ss::sharded domain_supervisor; ss::sharded l1_metastore_fe; diff --git a/src/v/cloud_topics/level_one/common/BUILD b/src/v/cloud_topics/level_one/common/BUILD index 71a1ff0e641db..b9257ee433a74 100644 --- a/src/v/cloud_topics/level_one/common/BUILD +++ b/src/v/cloud_topics/level_one/common/BUILD @@ -4,13 +4,21 @@ package(default_visibility = ["//src/v/cloud_topics/level_one:__subpackages__"]) redpanda_cc_library( name = "object_id", + srcs = [ + "object_id.cc", + ], hdrs = [ "object_id.h", ], + visibility = [ + "//src/v/cloud_topics/level_one:__subpackages__", + "//src/v/cloud_topics/reconciler:__subpackages__", + ], deps = [ "//src/v/base", "//src/v/utils:named_type", "//src/v/utils:uuid", + "@fmt", ], ) @@ -48,6 +56,10 @@ redpanda_cc_library( "//src/v/serde:vector", "//src/v/storage:record_batch_builder", ], + visibility = [ + "//src/v/cloud_topics/level_one:__subpackages__", + "//src/v/cloud_topics/reconciler:__subpackages__", + ], deps = [ "//src/v/base", "//src/v/container:chunked_vector", @@ -63,6 +75,10 @@ redpanda_cc_library( name = "abstract_io", srcs = ["abstract_io.cc"], hdrs = ["abstract_io.h"], + visibility = [ + "//src/v/cloud_topics/level_one:__subpackages__", + "//src/v/cloud_topics/reconciler:__subpackages__", + ], deps = [ ":object_id", "//src/v/container:chunked_vector", @@ -75,6 +91,11 @@ redpanda_cc_library( name = "file_io", srcs = ["file_io.cc"], hdrs = ["file_io.h"], + visibility = [ + "//src/v/cloud_topics:__pkg__", + "//src/v/cloud_topics/level_one:__subpackages__", + "//src/v/cloud_topics/reconciler:__pkg__", + ], deps = [ ":abstract_io", ":object_id", diff --git a/src/v/cloud_topics/reconciler/BUILD b/src/v/cloud_topics/reconciler/BUILD index 447dc831a0ba8..62d75965da1bb 100644 --- a/src/v/cloud_topics/reconciler/BUILD +++ b/src/v/cloud_topics/reconciler/BUILD @@ -3,16 +3,16 @@ load("//bazel:build.bzl", "redpanda_cc_library") package(default_visibility = [":__subpackages__"]) redpanda_cc_library( - name = "range_batch_consumer", + name = "reconciliation_consumer", srcs = [ - "range_batch_consumer.cc", + "reconciliation_consumer.cc", ], hdrs = [ - "range_batch_consumer.h", + "reconciliation_consumer.h", ], deps = [ "//src/v/base", - "//src/v/bytes:iobuf", + "//src/v/cloud_topics/level_one/common:object", "//src/v/model", "@abseil-cpp//absl/container:btree", "@seastar", @@ -33,13 +33,14 @@ redpanda_cc_library( ], visibility = ["//visibility:public"], deps = [ - ":range_batch_consumer", + ":reconciliation_consumer", "//src/v/base", - "//src/v/cloud_io:remote", - "//src/v/cloud_storage", "//src/v/cloud_storage_clients", "//src/v/cloud_topics:object_utils", "//src/v/cloud_topics:types", + "//src/v/cloud_topics/level_one/common:file_io", + "//src/v/cloud_topics/level_one/common:object", + "//src/v/cloud_topics/level_one/common:object_id", "//src/v/cloud_topics/level_one/common:object_utils", "//src/v/cloud_topics/level_zero/stm:ctp_stm_api", "//src/v/cluster", diff --git a/src/v/cloud_topics/reconciler/range_batch_consumer.cc b/src/v/cloud_topics/reconciler/range_batch_consumer.cc deleted file mode 100644 index 08f07e7e7e03a..0000000000000 --- a/src/v/cloud_topics/reconciler/range_batch_consumer.cc +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2025 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ - -#include "cloud_topics/reconciler/range_batch_consumer.h" - -#include "model/timestamp.h" - -namespace cloud_topics::reconciler { - -ss::future -range_batch_consumer::operator()(model::record_batch batch) { - if (!_base_offset.has_value()) { - _base_offset = model::offset_cast(batch.base_offset()); - } - // NOTE: we only have data batches here so it's safe - // to use timestamps without checking the batch type - if (_range.info.base_timestamp == model::timestamp{}) { - _range.info.base_timestamp = batch.header().first_timestamp; - } else { - _range.info.base_timestamp = std::min( - batch.header().first_timestamp, _range.info.base_timestamp); - } - _range.info.last_timestamp = std::max( - batch.header().max_timestamp, _range.info.last_timestamp); - _range.info.last_offset = model::offset_cast(batch.last_offset()); - - bool add_term = false; - if (_range.info.terms.empty()) { - // Always add term of the first batch in the sequence. We don't really - // know if it's the first batch in the term so some filtering will be - // needed later. - add_term = true; - } else { - auto term = _range.info.terms.rbegin()->first; - if (term != batch.term()) { - add_term = true; - } - } - if (add_term) { - _range.info.terms.insert( - std::make_pair( - batch.term(), model::offset_cast(batch.base_offset()))); - } - - auto data = serde::to_iobuf(std::move(batch)); - _range.data.append(std::move(data)); - - co_return ss::stop_iteration::no; -} - -std::optional range_batch_consumer::end_of_stream() { - if (_base_offset.has_value()) { - _range.info.base_offset = _base_offset.value(); - return std::move(_range); - } - return std::nullopt; -} - -} // namespace cloud_topics::reconciler diff --git a/src/v/cloud_topics/reconciler/reconciler.cc b/src/v/cloud_topics/reconciler/reconciler.cc index 345fbfbe1821e..900a5cc0bc1f9 100644 --- a/src/v/cloud_topics/reconciler/reconciler.cc +++ b/src/v/cloud_topics/reconciler/reconciler.cc @@ -11,13 +11,9 @@ #include "cloud_topics/reconciler/reconciler.h" #include "base/vlog.h" -#include "cloud_storage/configuration.h" #include "cloud_topics/data_plane_api.h" #include "cloud_topics/frontend/frontend.h" -#include "cloud_topics/level_one/common/object_utils.h" -#include "cloud_topics/level_zero/stm/ctp_stm_api.h" -#include "cloud_topics/object_utils.h" -#include "cloud_topics/types.h" +#include "cloud_topics/reconciler/reconciliation_consumer.h" #include "cluster/partition.h" #include "kafka/utils/txn_reader.h" #include "model/namespace.h" @@ -27,12 +23,8 @@ #include namespace { -ss::logger lg("reconciler"); +ss::logger rlog("reconciler"); -/* - * Temporary hack for identifying cloud partitions (topic has "_ct" suffix). - * This can be removed once we teach Redpanda about this new type of topic. - */ bool is_cloud_partition( const ss::lw_shared_ptr& partition) { return partition->get_ntp_config().cloud_topic_enabled(); @@ -64,28 +56,13 @@ namespace cloud_topics::reconciler { reconciler::reconciler( ss::sharded* pm, - ss::sharded* cloud_io, data_plane_api* data_plane, - std::optional bucket) + l1::file_io* l1_io) : _partition_manager(pm) - , _cloud_io(cloud_io) - , _data_plane(data_plane) { - if (bucket.has_value()) { - _bucket = std::move(bucket.value()); - } else { - _bucket = cloud_storage_clients::bucket_name( - cloud_storage::configuration::get_bucket_config().value().value()); - } -} + , _data_plane(data_plane) + , _l1_io(l1_io) {} ss::future<> reconciler::start() { - _manage_notify_handle - = _partition_manager->local().register_manage_notification( - model::kafka_namespace, - [this](ss::lw_shared_ptr p) { - attach_partition(std::move(p)); - }); - _unmanage_notify_handle = _partition_manager->local().register_unmanage_notification( model::kafka_namespace, [this](model::topic_partition_view tp_p) { @@ -93,6 +70,13 @@ ss::future<> reconciler::start() { model::ntp(model::kafka_namespace, tp_p.topic, tp_p.partition)); }); + _manage_notify_handle + = _partition_manager->local().register_manage_notification( + model::kafka_namespace, + [this](ss::lw_shared_ptr p) { + attach_partition(std::move(p)); + }); + ssx::spawn_with_gate(_gate, [this] { return reconciliation_loop(); }); co_return; @@ -117,7 +101,7 @@ void reconciler::attach_partition( return; } const auto& ntp = partition->ntp(); - vlog(lg.info, "Reconciler is attaching cloud partition {}", ntp); + vlog(rlog.debug, "Attaching partition {}", ntp); auto attached = ss::make_lw_shared(partition); auto res = _partitions.try_emplace(ntp, std::move(attached)); vassert(res.second, "Double registration of ntp {}", ntp); @@ -125,7 +109,7 @@ void reconciler::attach_partition( void reconciler::detach_partition(const model::ntp& ntp) { if (auto it = _partitions.find(ntp); it != _partitions.end()) { - vlog(lg.info, "Reconciler is detaching partition {}", ntp); + vlog(rlog.debug, "Detaching partition {}", ntp); /* * This upcall doesn't synchronize with the rest of the reconciler, * which means that once a reference to an attached partition is held, @@ -136,21 +120,12 @@ void reconciler::detach_partition(const model::ntp& ntp) { } } -void reconciler::object::add(range range, const attached_partition& partition) { - vassert(!range.data.empty(), "cannot add an empty range to object"); - const auto physical_offset_start = data.size_bytes(); - data.append(std::move(range.data)); - const auto physical_offset_end = data.size_bytes(); - - ranges.emplace_back( - partition, physical_offset_start, physical_offset_end, range.info); -} - ss::future<> reconciler::reconciliation_loop() { /* - * polling is not particularly efficient, and in practice, we'll probably + * Polling is not particularly efficient, and in practice, we'll probably * want to look into receiving upcalls from partitions announcing that new * data is available. + * TODO: Investigate performance of polling and alternatives to polling. */ constexpr std::chrono::seconds poll_frequency(10); @@ -159,16 +134,21 @@ ss::future<> reconciler::reconciliation_loop() { co_await _control_sem.wait( poll_frequency, std::max(_control_sem.current(), size_t(1))); } catch (const ss::semaphore_timed_out&) { - // time to do some work + // Time to do some work. } + vlog( + rlog.debug, + "Reconciliation loop tick with {} attached partitions", + _partitions.size()); + try { co_await reconcile(); } catch (...) { const auto is_shutdown = ssx::is_shutdown_exception( std::current_exception()); vlogl( - lg, + rlog, is_shutdown ? ss::log_level::debug : ss::log_level::info, "Recoverable error during reconciliation: {}", std::current_exception()); @@ -177,25 +157,41 @@ ss::future<> reconciler::reconciliation_loop() { } ss::future<> reconciler::reconcile() { + // Build object. auto object = co_await build_object(); if (!object.has_value()) { + vlog(rlog.debug, "No object to upload, skipping"); co_return; } - auto path = l1::object_path_factory::level_one_path(l1::create_object_id()); - auto result = co_await upload_object(path, std::move(object->data)); - if (result != cloud_io::upload_result::success) { - vlog(lg.info, "Failed to upload L1 object: {}", result); + + vlog( + rlog.debug, + "Built L1 object from {} partitions", + object->partitions.size()); + + // Upload object. + auto object_id = l1::create_object_id(); + auto upload_result = co_await _l1_io->put_object( + object_id, object->staging_file.get(), &_as); + co_await object->staging_file->remove(); + if (!upload_result.has_value()) { + vlog( + rlog.warn, + "Failed to upload L1 object: {}", + static_cast(upload_result.error())); co_return; } + vlog(rlog.debug, "Successfully uploaded L1 object: {}", object_id); - // commit for each partition represented in the uploaded object - for (const auto& range : object->ranges) { - co_await commit_object(path, range); + // Commit object. + for (const auto& partition_info : object->partitions) { + co_await commit_object(object_id, partition_info); } } -ss::future> reconciler::build_object() { - // light-weight copy for stable iteration +ss::future> reconciler::build_object() { + // Copy the leader partition information in case of + // mid-reconciliation unregistration. std::vector partitions; for (const auto& p : _partitions) { if (p.second->partition->is_leader()) { @@ -203,67 +199,103 @@ ss::future> reconciler::build_object() { } } - // avoid starving partitions + if (partitions.empty()) { + vlog(rlog.debug, "No leader partitions to reconcile"); + co_return std::nullopt; + } + + // Shuffle to avoid starving partitions. + // TODO: Investigate how to divide work between partitions with + // different throughput. std::shuffle( partitions.begin(), partitions.end(), random_generators::internal::gen); - object object; + auto staging_file_result = co_await _l1_io->create_tmp_file(); + if (!staging_file_result.has_value()) { + vlog( + rlog.warn, + "Failed to create staging file: {}", + static_cast(staging_file_result.error())); + co_return std::nullopt; + } + + built_object result; + result.staging_file = std::move(staging_file_result.value()); + + auto output_stream = co_await result.staging_file->output_stream(); + auto builder = l1::object_builder::create( + std::move(output_stream), + // TODO: The default is OK? + l1::object_builder::options{}); + auto size_budget = max_object_size; for (const auto& partition : partitions) { + vlog( + rlog.debug, + "Processing partition {} with LRO {}", + partition->partition->ntp(), + partition->lro); + auto reader = co_await make_reader(partition, size_budget); - auto range = co_await std::move(reader).consume( - range_batch_consumer{}, model::no_timeout); - if (range.has_value()) { - object.add(std::move(*range), partition); - size_budget -= std::min(object.data.size_bytes(), size_budget); + reconciliation_consumer consumer( + builder.get(), partition->partition->ntp()); + auto metadata = co_await std::move(reader).consume( + std::move(consumer), model::no_timeout); + + if (!metadata.has_value()) { + vlog( + rlog.debug, + "No batches found for partition {}", + partition->partition->ntp()); + continue; } + + vlog( + rlog.debug, + "Adding partition {} to L1 object with offsets {}~{}", + partition->partition->ntp(), + metadata->base_offset, + metadata->last_offset); + result.partitions.emplace_back(partition, std::move(metadata.value())); + + auto current_size = co_await result.staging_file->size(); + if (current_size >= max_object_size) { + break; + } + size_budget = max_object_size - current_size; } - if (object.data.empty()) { + if (result.partitions.empty()) { + co_await builder->close(); + co_await result.staging_file->remove(); co_return std::nullopt; } - co_return object; -} + result.object_info = co_await builder->finish(); + co_await builder->close(); -ss::future reconciler::upload_object( - cloud_storage_clients::object_key key, iobuf payload) { - retry_chain_node rtc( - _as, - ss::lowres_clock::now() + std::chrono::seconds(20), - std::chrono::seconds(1)); - - co_return co_await _cloud_io->local().upload_object({ - .transfer_details = { - .bucket = _bucket, - .key = key, - .parent_rtc = rtc, - }, - .display_str = "l1_object", - .payload = std::move(payload), - }); + co_return result; } ss::future<> reconciler::commit_object( - const cloud_storage_clients::object_key& key, - const object_range_info& range) { + const l1::object_id& object_id, const partition_commit_info& partition_info) { /* - * TODO register the L1 object with L1 metastore. + * TODO register the L1 object with L1 metastore using object_id and + * partition_info. */ - const auto& part = range.partition->partition; + const auto& part = partition_info.partition->partition; + const auto& metadata = partition_info.metadata; - range.partition->lro = range.info.last_offset + kafka::offset(1); + partition_info.partition->lro = metadata.last_offset + kafka::offset(1); vlog( - lg.info, - "Committed overlay to {} for {} phy {}~{} log {}~{}. New LRO {}", - key, + rlog.debug, + "Committed overlay to object {} for {} log {}~{}. New LRO {}", + object_id, part->ntp(), - range.physical_offset_start, - range.physical_offset_end, - range.info.base_offset, - range.info.last_offset, - range.partition->lro); + metadata.base_offset, + metadata.last_offset, + partition_info.partition->lro); co_return; } @@ -276,7 +308,7 @@ reconciler::make_reader(const attached_partition& partition, size_t max_bytes) { auto effective_start = co_await fe.sync_effective_start(5s); if (!effective_start.has_value()) { vlog( - lg.info, + rlog.warn, "Error querying partition start offset ({}): {}", cluster_partition->ntp(), effective_start.error()); @@ -289,7 +321,7 @@ reconciler::make_reader(const attached_partition& partition, size_t max_bytes) { auto maybe_lso = fe.last_stable_offset(); if (!maybe_lso.has_value()) { vlog( - lg.info, + rlog.warn, "Error querying partition LSO ({}): {}", cluster_partition->ntp(), maybe_lso.error()); diff --git a/src/v/cloud_topics/reconciler/reconciler.h b/src/v/cloud_topics/reconciler/reconciler.h index e8e864eb5c327..df8be9b25d2f2 100644 --- a/src/v/cloud_topics/reconciler/reconciler.h +++ b/src/v/cloud_topics/reconciler/reconciler.h @@ -12,9 +12,10 @@ #include "absl/container/node_hash_map.h" #include "base/seastarx.h" -#include "cloud_io/remote.h" -#include "cloud_storage_clients/types.h" -#include "cloud_topics/reconciler/range_batch_consumer.h" +#include "cloud_topics/level_one/common/file_io.h" +#include "cloud_topics/level_one/common/object.h" +#include "cloud_topics/level_one/common/object_id.h" +#include "cloud_topics/reconciler/reconciliation_consumer.h" #include "cluster/notification.h" #include "cluster/partition.h" #include "cluster/partition_manager.h" @@ -24,6 +25,7 @@ #include #include +#include #include namespace cloud_topics { @@ -41,10 +43,7 @@ namespace cloud_topics::reconciler { class reconciler { public: reconciler( - ss::sharded*, - ss::sharded*, - data_plane_api*, - std::optional = std::nullopt); + ss::sharded*, data_plane_api*, l1::file_io*); reconciler(const reconciler&) = delete; reconciler& operator=(const reconciler&) = delete; @@ -57,9 +56,9 @@ class reconciler { private: /* - * an attached partition is a partition that the reconciler is tracking and - * periodically processing. partitions are attached/detatched via upcalls - * from the cluster module. the reconciler operates on the leaders of + * An attached partition is a partition that the reconciler is tracking and + * periodically processing. Partitions are attached/detached via upcalls + * from the cluster module. The reconciler operates on the leaders of * partitions with affinity to the local shard. */ struct attached_partition_info { @@ -70,16 +69,16 @@ class reconciler { ss::lw_shared_ptr partition; /* - * last reconciled offset. this forms the starting offset when querying + * Last reconciled offset. this forms the starting offset when querying * the partition for new data. In later versions of the system this will * be stored in and queried from the partition itself. + * TODO: Rename this, and set it using the L0 LRO and the L1 metastore. */ kafka::offset lro; }; using attached_partition = ss::lw_shared_ptr; - // currently attached partitions absl::node_hash_map _partitions; void attach_partition(ss::lw_shared_ptr); @@ -89,62 +88,50 @@ class reconciler { cluster::notification_id_type _unmanage_notify_handle; private: - static constexpr size_t max_object_size = 4_MiB; + static constexpr size_t max_object_size = 64_MiB; /* - * metadata about a materialized range of batches stored in an L1 object. - * after an object is created and uploaded, this metadata is used to drive - * the creation and replication of overlay batches to each partition. - * - * partition - the source partition - * physical extent - position within the object - * range info - additional metadata (e.g. kafka offset extent) + * Metadata about a partition in an L1 object, used for committing. + * TODO: Update to commit using the L1 metastore. */ - struct object_range_info { + struct partition_commit_info { attached_partition partition; - uint64_t physical_offset_start; - uint64_t physical_offset_end; - range_info info; + partition_metadata metadata; }; /* - * a staged / materialized L1 object. - * - * data - the payload - * ranges - metadata about each range in the payload + * An L1 object built using object_builder with associated partition + * metadata. */ - struct object { - iobuf data; - chunked_vector ranges; - - // add a range from the given partition - void add(range, const attached_partition&); + struct built_object { + l1::object_builder::object_info object_info; + std::unique_ptr staging_file; + chunked_vector partitions; }; - // top-level background worker that drives reconciliation + // Top-level background worker that drives reconciliation. ss::future<> reconciliation_loop(); ssx::semaphore _control_sem{0, "reconciler::semaphore"}; /* - * one round of reconciliation in which data from one or more partitions may - * be reconciled into an L1 object. operates on the set of currently + * One round of reconciliation in which data from one or more partitions may + * be reconciled into an L1 object. Operates on the set of currently * attached partitions. */ ss::future<> reconcile(); /* - * reconciliation is a three step process. first an L1 object is built, then - * it is uploaded to cloud storage, and finally its committed. + * Reconciliation is a three step process. First, an L1 object is built, + * then it is uploaded to cloud storage, and finally it is committed. + * TODO: This process occurs for each domain, once using the metastore. */ - ss::future> build_object(); - ss::future - upload_object(cloud_storage_clients::object_key, iobuf); - ss::future<> commit_object( - const cloud_storage_clients::object_key&, const object_range_info&); + ss::future> build_object(); + ss::future<> + commit_object(const l1::object_id&, const partition_commit_info&); /* - * build a partition reader that returns batches to be reconciled. reading - * will start from the last reconcilied offset. if there is no data that + * Build a partition reader that returns batches to be reconciled. Reading + * will start from the last reconcilied offset. If there is no data that * needs to be reconciled then an empty reader is returned. */ ss::future @@ -152,9 +139,8 @@ class reconciler { private: ss::sharded* _partition_manager; - ss::sharded* _cloud_io; data_plane_api* _data_plane; - cloud_storage_clients::bucket_name _bucket; + l1::file_io* _l1_io; ss::gate _gate; ss::abort_source _as; }; diff --git a/src/v/cloud_topics/reconciler/reconciliation_consumer.cc b/src/v/cloud_topics/reconciler/reconciliation_consumer.cc new file mode 100644 index 0000000000000..adda2aaa4e0f2 --- /dev/null +++ b/src/v/cloud_topics/reconciler/reconciliation_consumer.cc @@ -0,0 +1,62 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_topics/reconciler/reconciliation_consumer.h" + +#include "model/timestamp.h" + +namespace cloud_topics::reconciler { + +constexpr kafka::offset offset_unset{-1}; + +reconciliation_consumer::reconciliation_consumer( + l1::object_builder* builder, model::ntp ntp) + : _builder(builder) + , _ntp(std::move(ntp)) + , _metadata{ + .base_offset = offset_unset, + .last_offset = offset_unset, + .base_timestamp = model::timestamp::max(), + .last_timestamp = model::timestamp::min()} {} + +ss::future +reconciliation_consumer::operator()(model::record_batch batch) { + if (_metadata.base_offset == offset_unset) { + _metadata.base_offset = model::offset_cast(batch.base_offset()); + co_await _builder->start_partition(_ntp); + } + + // NOTE: we only have data batches here so it's safe + // to use timestamps without checking the batch type. + _metadata.base_timestamp = std::min( + batch.header().first_timestamp, _metadata.base_timestamp); + _metadata.last_timestamp = std::max( + batch.header().max_timestamp, _metadata.last_timestamp); + _metadata.last_offset = model::offset_cast(batch.last_offset()); + + if (!_metadata.terms.contains(batch.term())) { + _metadata.terms.insert( + std::make_pair( + batch.term(), model::offset_cast(batch.base_offset()))); + } + + co_await _builder->add_batch(std::move(batch)); + + co_return ss::stop_iteration::no; +} + +std::optional reconciliation_consumer::end_of_stream() { + if (_metadata.base_offset != offset_unset) { + return _metadata; + } + return std::nullopt; +} + +} // namespace cloud_topics::reconciler diff --git a/src/v/cloud_topics/reconciler/range_batch_consumer.h b/src/v/cloud_topics/reconciler/reconciliation_consumer.h similarity index 64% rename from src/v/cloud_topics/reconciler/range_batch_consumer.h rename to src/v/cloud_topics/reconciler/reconciliation_consumer.h index f172f653af177..b5f5ee2b862a7 100644 --- a/src/v/cloud_topics/reconciler/range_batch_consumer.h +++ b/src/v/cloud_topics/reconciler/reconciliation_consumer.h @@ -11,7 +11,7 @@ #pragma once #include "absl/container/btree_map.h" -#include "bytes/iobuf.h" +#include "cloud_topics/level_one/common/object.h" #include "model/fundamental.h" #include "model/record.h" #include "model/timestamp.h" @@ -23,38 +23,28 @@ namespace cloud_topics::reconciler { -/* - * metadata about a range of batches. - */ -struct range_info { +struct partition_metadata { kafka::offset base_offset; kafka::offset last_offset; model::timestamp base_timestamp; model::timestamp last_timestamp; - // 'range_info' is not aligned by term boundary so this - // map is used to track term changes absl::btree_map terms; }; -/* - * a materialized range of batches. - */ -struct range { - iobuf data; - range_info info; -}; - -/* - * Consumer that builds a range from a record batch reader. - */ -class range_batch_consumer { +/// Consumes record batches from a partition and writes them to an L1 object. +/// Produces metadata about the consumed range including offsets, timestamps, +/// and term transitions. +class reconciliation_consumer { public: + reconciliation_consumer(l1::object_builder* builder, model::ntp ntp); + ss::future operator()(model::record_batch); - std::optional end_of_stream(); + std::optional end_of_stream(); private: - range _range; - std::optional _base_offset; + l1::object_builder* _builder; + model::ntp _ntp; + partition_metadata _metadata; }; } // namespace cloud_topics::reconciler diff --git a/src/v/cloud_topics/reconciler/tests/BUILD b/src/v/cloud_topics/reconciler/tests/BUILD index ad40ad81db05d..283f7c93a4d93 100644 --- a/src/v/cloud_topics/reconciler/tests/BUILD +++ b/src/v/cloud_topics/reconciler/tests/BUILD @@ -1,17 +1,20 @@ load("//bazel:test.bzl", "redpanda_cc_gtest") redpanda_cc_gtest( - name = "range_batch_consumer_test", + name = "reconciliation_consumer_test", timeout = "short", srcs = [ - "range_batch_consumer_test.cc", + "reconciliation_consumer_test.cc", ], deps = [ - "//src/v/cloud_topics/reconciler:range_batch_consumer", + "//src/v/bytes:iostream", + "//src/v/cloud_topics/level_one/common:object", + "//src/v/cloud_topics/reconciler:reconciliation_consumer", "//src/v/model", "//src/v/storage:record_batch_builder", "//src/v/test_utils:gtest", "//src/v/test_utils:random_bytes", "@googletest//:gtest", + "@seastar", ], ) diff --git a/src/v/cloud_topics/reconciler/tests/range_batch_consumer_test.cc b/src/v/cloud_topics/reconciler/tests/range_batch_consumer_test.cc deleted file mode 100644 index b8806f2ffecae..0000000000000 --- a/src/v/cloud_topics/reconciler/tests/range_batch_consumer_test.cc +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2024 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ -#include "cloud_topics/reconciler/range_batch_consumer.h" -#include "model/record_batch_reader.h" -#include "storage/record_batch_builder.h" -#include "test_utils/random_bytes.h" - -#include - -using consumer = cloud_topics::reconciler::range_batch_consumer; - -model::record_batch_reader make_reader( - int offset, int record_size, int num_batches, int records_per_batch) { - ss::chunked_fifo batches; - for (int i = 0; i < num_batches; i++) { - storage::record_batch_builder b( - model::record_batch_type::raft_data, model::offset(offset)); - for (int j = 0; j < records_per_batch; j++) { - b.add_raw_kv(tests::random_iobuf(record_size), iobuf()); - offset += 1; - } - batches.push_back(std::move(b).build()); - } - return model::make_chunked_memory_record_batch_reader(std::move(batches)); -} - -TEST(RangeBatchConsumer, EmptyReader) { - auto reader = model::make_empty_record_batch_reader(); - auto range = std::move(reader).consume(consumer{}, model::no_timeout).get(); - ASSERT_FALSE(range.has_value()); -} - -TEST(RangeBatchConsumer, MultipleBatchesRecords) { - size_t record_size = 100; - size_t base_offset = 11; - for (int i = 1; i < 4; i++) { - for (int j = 1; j < 2; j++) { - auto reader = make_reader(base_offset, record_size, i, j); - auto range - = std::move(reader).consume(consumer{}, model::no_timeout).get(); - ASSERT_TRUE(range.has_value()); - ASSERT_EQ(range->info.base_offset(), base_offset); - ASSERT_EQ(range->info.last_offset(), base_offset + (i * j) - 1); - } - } -} diff --git a/src/v/cloud_topics/reconciler/tests/reconciliation_consumer_test.cc b/src/v/cloud_topics/reconciler/tests/reconciliation_consumer_test.cc new file mode 100644 index 0000000000000..8e960d3754a15 --- /dev/null +++ b/src/v/cloud_topics/reconciler/tests/reconciliation_consumer_test.cc @@ -0,0 +1,84 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "bytes/iostream.h" +#include "cloud_topics/level_one/common/object.h" +#include "cloud_topics/reconciler/reconciliation_consumer.h" +#include "model/namespace.h" +#include "model/record_batch_reader.h" +#include "storage/record_batch_builder.h" +#include "test_utils/random_bytes.h" + +#include + +#include + +using consumer = cloud_topics::reconciler::reconciliation_consumer; +using namespace cloud_topics::l1; + +model::record_batch_reader make_reader( + int offset, int record_size, int num_batches, int records_per_batch) { + ss::chunked_fifo batches; + for (int i = 0; i < num_batches; i++) { + storage::record_batch_builder b( + model::record_batch_type::raft_data, model::offset(offset)); + for (int j = 0; j < records_per_batch; j++) { + b.add_raw_kv(tests::random_iobuf(record_size), iobuf()); + offset += 1; + } + batches.push_back(std::move(b).build()); + } + return model::make_chunked_memory_record_batch_reader(std::move(batches)); +} + +TEST(RangeBatchConsumer, EmptyReader) { + auto reader = model::make_empty_record_batch_reader(); + iobuf output; + auto builder = object_builder::create( + make_iobuf_ref_output_stream(output), object_builder::options{}); + auto _ = ss::defer([&builder] { builder->close().get(); }); + + model::ntp ntp( + model::ns("kimchi"), model::topic("taco"), model::partition_id(0)); + consumer c(builder.get(), ntp); + auto metadata + = std::move(reader).consume(std::move(c), model::no_timeout).get(); + ASSERT_FALSE(metadata.has_value()); +} + +TEST(RangeBatchConsumer, MultipleBatchesRecords) { + size_t record_size = 100; + size_t base_offset = 11; + for (int i = 1; i < 4; i++) { + for (int j = 1; j < 2; j++) { + auto reader = make_reader(base_offset, record_size, i, j); + iobuf output; + auto builder = object_builder::create( + make_iobuf_ref_output_stream(output), object_builder::options{}); + auto _ = ss::defer([&builder] { builder->close().get(); }); + + model::ntp ntp( + model::ns("kimchi"), + model::topic("taco"), + model::partition_id(0)); + consumer c(builder.get(), ntp); + auto metadata = std::move(reader) + .consume(std::move(c), model::no_timeout) + .get(); + ASSERT_TRUE(metadata.has_value()); + ASSERT_EQ(metadata->base_offset(), base_offset); + ASSERT_EQ(metadata->last_offset(), base_offset + (i * j) - 1); + + // Verify object was built + auto info = builder->finish().get(); + ASSERT_EQ(info.index.partitions.size(), 1); + ASSERT_EQ(info.index.partitions.begin()->first, ntp); + } + } +} diff --git a/src/v/config/node_config.h b/src/v/config/node_config.h index dd3795d36ed7d..580cbf44f6cac 100644 --- a/src/v/config/node_config.h +++ b/src/v/config/node_config.h @@ -150,6 +150,10 @@ struct node_config final : public config_store { return data_directory().path / "datalake_staging"; } + std::filesystem::path l1_staging_path() const { + return data_directory().path / "l1_staging"; + } + std::vector advertised_kafka_api() const { if (_advertised_kafka_api().empty()) { std::vector eps; From 9ee9297fc9cefdf3d73bbb1d082e0cc3ffb7ab6a Mon Sep 17 00:00:00 2001 From: Will Berkeley Date: Mon, 25 Aug 2025 14:11:02 -0700 Subject: [PATCH 2/6] cloud_topics/reconciler: Add metastore dependency This prepares the reconciler to use the metastore to manager L1 metadata. --- src/v/cloud_topics/BUILD | 1 + src/v/cloud_topics/app.cc | 16 +++++++++++----- .../cloud_topics/level_one/metastore/metastore.h | 2 ++ src/v/cloud_topics/reconciler/BUILD | 2 ++ src/v/cloud_topics/reconciler/reconciler.cc | 6 ++++-- src/v/cloud_topics/reconciler/reconciler.h | 7 ++++++- 6 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/v/cloud_topics/BUILD b/src/v/cloud_topics/BUILD index 2df9b53b7800f..17ee41fbfd877 100644 --- a/src/v/cloud_topics/BUILD +++ b/src/v/cloud_topics/BUILD @@ -97,6 +97,7 @@ redpanda_cc_library( "//src/v/cloud_topics/level_one/common:file_io", "//src/v/cloud_topics/level_one/domain:domain_supervisor", "//src/v/cloud_topics/level_one/metastore:frontend", + "//src/v/cloud_topics/level_one/metastore:replicated_metastore", "//src/v/cloud_topics/level_zero/common:extent_meta", "//src/v/cloud_topics/reconciler", "//src/v/cluster", diff --git a/src/v/cloud_topics/app.cc b/src/v/cloud_topics/app.cc index ba2c83e31aab7..4cf7497962943 100644 --- a/src/v/cloud_topics/app.cc +++ b/src/v/cloud_topics/app.cc @@ -13,6 +13,7 @@ #include "cloud_topics/cluster_services.h" #include "cloud_topics/data_plane_api.h" #include "cloud_topics/data_plane_impl.h" +#include "cloud_topics/level_one/metastore/replicated_metastore.h" #include "cluster/cluster_epoch_service.h" #include "cluster/controller.h" #include "config/node_config.h" @@ -61,11 +62,6 @@ ss::future<> app::construct( bucket, ss::sharded_parameter([&cloud_cache] { return &cloud_cache->local(); })); - co_await construct_service( - reconciler, - partition_mgr, - data_plane.get(), - ss::sharded_parameter([this] { return &l1_io.local(); })); co_await construct_service(domain_supervisor, controller); co_await construct_service( l1_metastore_fe, @@ -75,6 +71,16 @@ ss::future<> app::construct( shard_table, connection_cache, &domain_supervisor); + + co_await construct_service( + reconciler, + partition_mgr, + data_plane.get(), + ss::sharded_parameter([this] { return &l1_io.local(); }), + ss::sharded_parameter([this] { + return std::make_unique( + l1_metastore_fe.local()); + })); } ss::future<> app::start() { diff --git a/src/v/cloud_topics/level_one/metastore/metastore.h b/src/v/cloud_topics/level_one/metastore/metastore.h index 16d464ae6422a..fd045cbd5597e 100644 --- a/src/v/cloud_topics/level_one/metastore/metastore.h +++ b/src/v/cloud_topics/level_one/metastore/metastore.h @@ -49,6 +49,8 @@ namespace cloud_topics::l1 { // side effects. As such, callers can think of this interface as thread safe. class metastore { public: + virtual ~metastore() = default; + enum class errc { missing_ntp, invalid_request, diff --git a/src/v/cloud_topics/reconciler/BUILD b/src/v/cloud_topics/reconciler/BUILD index 62d75965da1bb..be47e41be0c55 100644 --- a/src/v/cloud_topics/reconciler/BUILD +++ b/src/v/cloud_topics/reconciler/BUILD @@ -42,6 +42,8 @@ redpanda_cc_library( "//src/v/cloud_topics/level_one/common:object", "//src/v/cloud_topics/level_one/common:object_id", "//src/v/cloud_topics/level_one/common:object_utils", + "//src/v/cloud_topics/level_one/metastore", + "//src/v/cloud_topics/level_one/metastore:replicated_metastore", "//src/v/cloud_topics/level_zero/stm:ctp_stm_api", "//src/v/cluster", "//src/v/cluster:notification", diff --git a/src/v/cloud_topics/reconciler/reconciler.cc b/src/v/cloud_topics/reconciler/reconciler.cc index 900a5cc0bc1f9..455ae288d734d 100644 --- a/src/v/cloud_topics/reconciler/reconciler.cc +++ b/src/v/cloud_topics/reconciler/reconciler.cc @@ -57,10 +57,12 @@ namespace cloud_topics::reconciler { reconciler::reconciler( ss::sharded* pm, data_plane_api* data_plane, - l1::file_io* l1_io) + l1::file_io* l1_io, + std::unique_ptr metastore) : _partition_manager(pm) , _data_plane(data_plane) - , _l1_io(l1_io) {} + , _l1_io(l1_io) + , _metastore(std::move(metastore)) {} ss::future<> reconciler::start() { _unmanage_notify_handle diff --git a/src/v/cloud_topics/reconciler/reconciler.h b/src/v/cloud_topics/reconciler/reconciler.h index df8be9b25d2f2..9bf5c010327a3 100644 --- a/src/v/cloud_topics/reconciler/reconciler.h +++ b/src/v/cloud_topics/reconciler/reconciler.h @@ -15,6 +15,7 @@ #include "cloud_topics/level_one/common/file_io.h" #include "cloud_topics/level_one/common/object.h" #include "cloud_topics/level_one/common/object_id.h" +#include "cloud_topics/level_one/metastore/metastore.h" #include "cloud_topics/reconciler/reconciliation_consumer.h" #include "cluster/notification.h" #include "cluster/partition.h" @@ -43,7 +44,10 @@ namespace cloud_topics::reconciler { class reconciler { public: reconciler( - ss::sharded*, data_plane_api*, l1::file_io*); + ss::sharded*, + data_plane_api*, + l1::file_io*, + std::unique_ptr); reconciler(const reconciler&) = delete; reconciler& operator=(const reconciler&) = delete; @@ -141,6 +145,7 @@ class reconciler { ss::sharded* _partition_manager; data_plane_api* _data_plane; l1::file_io* _l1_io; + std::unique_ptr _metastore; ss::gate _gate; ss::abort_source _as; }; From 4b8606e2e90f500c768a756776146e8b1b915abb Mon Sep 17 00:00:00 2001 From: Will Berkeley Date: Mon, 25 Aug 2025 15:39:15 -0700 Subject: [PATCH 3/6] cloud_topics/reconciler: Add machinery to get topic ids The metastore speaks topic ID, so the reconciler needs to. --- src/v/cloud_topics/app.cc | 3 ++- src/v/cloud_topics/reconciler/reconciler.cc | 26 +++++++++++++++++++-- src/v/cloud_topics/reconciler/reconciler.h | 8 ++++++- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/v/cloud_topics/app.cc b/src/v/cloud_topics/app.cc index 4cf7497962943..862ae98994b93 100644 --- a/src/v/cloud_topics/app.cc +++ b/src/v/cloud_topics/app.cc @@ -80,7 +80,8 @@ ss::future<> app::construct( ss::sharded_parameter([this] { return std::make_unique( l1_metastore_fe.local()); - })); + }), + ss::sharded_parameter([&metadata_cache] { return &metadata_cache->local(); })); } ss::future<> app::start() { diff --git a/src/v/cloud_topics/reconciler/reconciler.cc b/src/v/cloud_topics/reconciler/reconciler.cc index 455ae288d734d..2ff8817e3b7bd 100644 --- a/src/v/cloud_topics/reconciler/reconciler.cc +++ b/src/v/cloud_topics/reconciler/reconciler.cc @@ -58,11 +58,13 @@ reconciler::reconciler( ss::sharded* pm, data_plane_api* data_plane, l1::file_io* l1_io, - std::unique_ptr metastore) + std::unique_ptr metastore, + cluster::metadata_cache* metadata_cache) : _partition_manager(pm) , _data_plane(data_plane) , _l1_io(l1_io) - , _metastore(std::move(metastore)) {} + , _metastore(std::move(metastore)) + , _metadata_cache(metadata_cache) {} ss::future<> reconciler::start() { _unmanage_notify_handle @@ -356,4 +358,24 @@ reconciler::make_reader(const attached_partition& partition, size_t max_bytes) { std::move(tracker), std::move(reader.reader)); } +std::expected +reconciler::get_topic_id_partition(const model::ntp& ntp) const { + auto topic_ns = model::topic_namespace{ntp.ns, ntp.tp.topic}; + auto topic_cfg = _metadata_cache->get_topic_cfg(topic_ns); + if (!topic_cfg.has_value()) { + return std::unexpected( + fmt::format( + "Failed to get topic configuration for topic {}", + topic_ns)); + } + + if (!topic_cfg->tp_id.has_value()) { + return std::unexpected( + fmt::format("Topic {} does not have a topic_id", ntp.tp.topic)); + } + + model::topic_id tid = topic_cfg->tp_id.value(); + return model::topic_id_partition{tid, ntp.tp.partition}; +} + } // namespace cloud_topics::reconciler diff --git a/src/v/cloud_topics/reconciler/reconciler.h b/src/v/cloud_topics/reconciler/reconciler.h index 9bf5c010327a3..88f66b1dcbb5d 100644 --- a/src/v/cloud_topics/reconciler/reconciler.h +++ b/src/v/cloud_topics/reconciler/reconciler.h @@ -17,6 +17,7 @@ #include "cloud_topics/level_one/common/object_id.h" #include "cloud_topics/level_one/metastore/metastore.h" #include "cloud_topics/reconciler/reconciliation_consumer.h" +#include "cluster/metadata_cache.h" #include "cluster/notification.h" #include "cluster/partition.h" #include "cluster/partition_manager.h" @@ -47,7 +48,8 @@ class reconciler { ss::sharded*, data_plane_api*, l1::file_io*, - std::unique_ptr); + std::unique_ptr, + cluster::metadata_cache*); reconciler(const reconciler&) = delete; reconciler& operator=(const reconciler&) = delete; @@ -141,11 +143,15 @@ class reconciler { ss::future make_reader(const attached_partition&, size_t); + std::expected + get_topic_id_partition(const model::ntp&) const; + private: ss::sharded* _partition_manager; data_plane_api* _data_plane; l1::file_io* _l1_io; std::unique_ptr _metastore; + cluster::metadata_cache* _metadata_cache; ss::gate _gate; ss::abort_source _as; }; From 692a313d8b343e05a00b430a7486cbfeae2c9cc5 Mon Sep 17 00:00:00 2001 From: Will Berkeley Date: Mon, 25 Aug 2025 18:39:44 -0700 Subject: [PATCH 4/6] cloud_topics/reconciler: integrate metadata builder in object building The metastore may partition objects into separate domains. The reconciler must build objects according to this partitioning, so the metastore metadata building process must be used to coordinate the object building process. --- src/v/cloud_topics/app.cc | 3 +- src/v/cloud_topics/reconciler/BUILD | 1 + src/v/cloud_topics/reconciler/reconciler.cc | 239 +++++++++++++------- src/v/cloud_topics/reconciler/reconciler.h | 42 +++- 4 files changed, 204 insertions(+), 81 deletions(-) diff --git a/src/v/cloud_topics/app.cc b/src/v/cloud_topics/app.cc index 862ae98994b93..423f9c16451bf 100644 --- a/src/v/cloud_topics/app.cc +++ b/src/v/cloud_topics/app.cc @@ -81,7 +81,8 @@ ss::future<> app::construct( return std::make_unique( l1_metastore_fe.local()); }), - ss::sharded_parameter([&metadata_cache] { return &metadata_cache->local(); })); + ss::sharded_parameter( + [&metadata_cache] { return &metadata_cache->local(); })); } ss::future<> app::start() { diff --git a/src/v/cloud_topics/reconciler/BUILD b/src/v/cloud_topics/reconciler/BUILD index be47e41be0c55..028f47084c9fa 100644 --- a/src/v/cloud_topics/reconciler/BUILD +++ b/src/v/cloud_topics/reconciler/BUILD @@ -52,6 +52,7 @@ redpanda_cc_library( "//src/v/model", "//src/v/random:generators", "@abseil-cpp//absl/container:btree", + "@abseil-cpp//absl/container:flat_hash_map", "@abseil-cpp//absl/container:node_hash_map", "@seastar", ], diff --git a/src/v/cloud_topics/reconciler/reconciler.cc b/src/v/cloud_topics/reconciler/reconciler.cc index 2ff8817e3b7bd..c0e4e0232cfb1 100644 --- a/src/v/cloud_topics/reconciler/reconciler.cc +++ b/src/v/cloud_topics/reconciler/reconciler.cc @@ -161,39 +161,44 @@ ss::future<> reconciler::reconciliation_loop() { } ss::future<> reconciler::reconcile() { - // Build object. - auto object = co_await build_object(); - if (!object.has_value()) { - vlog(rlog.debug, "No object to upload, skipping"); + auto metadata_builder = _metastore->object_builder(); + + auto objects = co_await build_objects(metadata_builder.get()); + if (objects.empty()) { + vlog(rlog.debug, "No objects to upload"); co_return; } - vlog( - rlog.debug, - "Built L1 object from {} partitions", - object->partitions.size()); - - // Upload object. - auto object_id = l1::create_object_id(); - auto upload_result = co_await _l1_io->put_object( - object_id, object->staging_file.get(), &_as); - co_await object->staging_file->remove(); - if (!upload_result.has_value()) { + for (auto& object : objects) { vlog( - rlog.warn, - "Failed to upload L1 object: {}", - static_cast(upload_result.error())); - co_return; - } - vlog(rlog.debug, "Successfully uploaded L1 object: {}", object_id); + rlog.debug, + "About to upload L1 object {} built from {} partitions", + object.object_id, + object.partitions.size()); + + // Upload. + auto upload_result = co_await _l1_io->put_object( + object.object_id, object.staging_file.get(), &_as); + co_await object.staging_file->remove(); + if (!upload_result.has_value()) { + vlog( + rlog.warn, + "Failed to upload L1 object: {}", + static_cast(upload_result.error())); + continue; + } + vlog(rlog.debug, "Uploaded L1 object {}", object.object_id); - // Commit object. - for (const auto& partition_info : object->partitions) { - co_await commit_object(object_id, partition_info); + // Commit. + // TODO: This is the old way, not the metastore way. + for (const auto& partition_info : object.partitions) { + co_await commit_object(object.object_id, partition_info); + } } } -ss::future> reconciler::build_object() { +ss::future> reconciler::build_objects( + l1::metastore::object_metadata_builder* metadata_builder) { // Copy the leader partition information in case of // mid-reconciliation unregistration. std::vector partitions; @@ -205,7 +210,7 @@ ss::future> reconciler::build_object() { if (partitions.empty()) { vlog(rlog.debug, "No leader partitions to reconcile"); - co_return std::nullopt; + co_return chunked_vector{}; } // Shuffle to avoid starving partitions. @@ -214,71 +219,61 @@ ss::future> reconciler::build_object() { std::shuffle( partitions.begin(), partitions.end(), random_generators::internal::gen); - auto staging_file_result = co_await _l1_io->create_tmp_file(); - if (!staging_file_result.has_value()) { + auto contexts_result = co_await setup_contexts( + metadata_builder, partitions); + if (!contexts_result.has_value()) { vlog( - rlog.warn, - "Failed to create staging file: {}", - static_cast(staging_file_result.error())); - co_return std::nullopt; + rlog.error, "Failed to setup contexts: {}", contexts_result.error()); + co_return chunked_vector{}; } - built_object result; - result.staging_file = std::move(staging_file_result.value()); - - auto output_stream = co_await result.staging_file->output_stream(); - auto builder = l1::object_builder::create( - std::move(output_stream), - // TODO: The default is OK? - l1::object_builder::options{}); + auto contexts = std::move(contexts_result.value()); - auto size_budget = max_object_size; for (const auto& partition : partitions) { - vlog( - rlog.debug, - "Processing partition {} with LRO {}", - partition->partition->ntp(), - partition->lro); - - auto reader = co_await make_reader(partition, size_budget); - reconciliation_consumer consumer( - builder.get(), partition->partition->ntp()); - auto metadata = co_await std::move(reader).consume( - std::move(consumer), model::no_timeout); + auto tidp_it = contexts.ntp_to_tidp.find(partition->partition->ntp()); + vassert( + tidp_it != contexts.ntp_to_tidp.end(), + "No topic_id_partition found for {}", + partition->partition->ntp()); + + auto [_, tidp] = *tidp_it; + auto object_id_it = contexts.tidp_to_object.find(tidp); + vassert( + object_id_it != contexts.tidp_to_object.end(), + "No object context found for {}", + tidp); + + auto [_unused, object_id] = *object_id_it; + auto& ctx = contexts.objects[object_id]; + auto metadata = co_await add_partition_to_object( + ctx.builder.get(), partition, ctx.remaining_budget); if (!metadata.has_value()) { - vlog( - rlog.debug, - "No batches found for partition {}", - partition->partition->ntp()); continue; } - vlog( - rlog.debug, - "Adding partition {} to L1 object with offsets {}~{}", - partition->partition->ntp(), - metadata->base_offset, - metadata->last_offset); - result.partitions.emplace_back(partition, std::move(metadata.value())); - - auto current_size = co_await result.staging_file->size(); - if (current_size >= max_object_size) { - break; - } - size_budget = max_object_size - current_size; - } + ctx.result.partitions.emplace_back( + partition, std::move(metadata.value())); - if (result.partitions.empty()) { - co_await builder->close(); - co_await result.staging_file->remove(); - co_return std::nullopt; + auto current_size = co_await ctx.result.staging_file->size(); + ctx.remaining_budget = max_object_size - current_size; } - result.object_info = co_await builder->finish(); - co_await builder->close(); + chunked_vector objects; + for (auto& [object_id, ctx] : contexts.objects) { + if (ctx.result.partitions.empty()) { + co_await ctx.builder->close(); + co_await ctx.result.staging_file->remove(); + continue; + } + + ctx.result.object_info = co_await ctx.builder->finish(); + co_await ctx.builder->close(); + + objects.emplace_back(std::move(ctx.result)); + } - co_return result; + co_return objects; } ss::future<> reconciler::commit_object( @@ -358,6 +353,93 @@ reconciler::make_reader(const attached_partition& partition, size_t max_bytes) { std::move(tracker), std::move(reader.reader)); } +ss::future> +reconciler::add_partition_to_object( + l1::object_builder* builder, + const attached_partition& partition, + size_t size_budget) { + vlog( + rlog.debug, + "Processing partition {} with LRO {}", + partition->partition->ntp(), + partition->lro); + + auto reader = co_await make_reader(partition, size_budget); + reconciliation_consumer consumer(builder, partition->partition->ntp()); + auto metadata = co_await std::move(reader).consume( + std::move(consumer), model::no_timeout); + + if (!metadata.has_value()) { + vlog( + rlog.debug, + "No batches found for partition {}", + partition->partition->ntp()); + co_return std::nullopt; + } + + vlog( + rlog.debug, + "Adding partition {} to L1 object with offsets {}~{}", + partition->partition->ntp(), + metadata->base_offset, + metadata->last_offset); + + co_return metadata.value(); +} + +ss::future> +reconciler::setup_contexts( + l1::metastore::object_metadata_builder* metadata_builder, + const std::vector& partitions) { + build_contexts contexts; + + for (const auto& partition : partitions) { + // Get topic_id_partition for this partition + auto tidp_result = get_topic_id_partition(partition->partition->ntp()); + if (!tidp_result.has_value()) { + co_return std::unexpected( + fmt::format( + "Failed to get topic_id_partition for {}: {}", + partition->partition->ntp(), + tidp_result.error())); + } + + auto tidp = tidp_result.value(); + + // Get or create object for this partition + auto object_id = metadata_builder->get_or_create_object_for(tidp); + + // Store the mappings + contexts.ntp_to_tidp[partition->partition->ntp()] = tidp; + contexts.tidp_to_object[tidp] = object_id; + + // Create build context if this is a new object + if (contexts.objects.find(object_id) == contexts.objects.end()) { + auto staging_file_result = co_await _l1_io->create_tmp_file(); + if (!staging_file_result.has_value()) { + co_return std::unexpected( + fmt::format( + "Failed to create staging file: {}", + static_cast(staging_file_result.error()))); + } + + object_build_context ctx; + ctx.remaining_budget = max_object_size; + ctx.result.object_id = object_id; + ctx.result.staging_file = std::move(staging_file_result.value()); + + auto output_stream + = co_await ctx.result.staging_file->output_stream(); + ctx.builder = l1::object_builder::create( + std::move(output_stream), l1::object_builder::options{}); + + contexts.objects[object_id] = std::move(ctx); + } + } + + co_return contexts; +} + std::expected reconciler::get_topic_id_partition(const model::ntp& ntp) const { auto topic_ns = model::topic_namespace{ntp.ns, ntp.tp.topic}; @@ -365,8 +447,7 @@ reconciler::get_topic_id_partition(const model::ntp& ntp) const { if (!topic_cfg.has_value()) { return std::unexpected( fmt::format( - "Failed to get topic configuration for topic {}", - topic_ns)); + "Failed to get topic configuration for topic {}", topic_ns)); } if (!topic_cfg->tp_id.has_value()) { diff --git a/src/v/cloud_topics/reconciler/reconciler.h b/src/v/cloud_topics/reconciler/reconciler.h index 88f66b1dcbb5d..5a4721f8fd7a8 100644 --- a/src/v/cloud_topics/reconciler/reconciler.h +++ b/src/v/cloud_topics/reconciler/reconciler.h @@ -10,6 +10,7 @@ #pragma once +#include "absl/container/flat_hash_map.h" #include "absl/container/node_hash_map.h" #include "base/seastarx.h" #include "cloud_topics/level_one/common/file_io.h" @@ -110,11 +111,34 @@ class reconciler { * metadata. */ struct built_object { + l1::object_id object_id; l1::object_builder::object_info object_info; std::unique_ptr staging_file; chunked_vector partitions; }; + /* + * Context for building a single L1 object, tracked during the + * build_objects process. + */ + struct object_build_context { + l1::object_id object_id; + std::unique_ptr builder; + size_t remaining_budget; + built_object result; + }; + + /* + * Overall context for building multiple L1 objects grouped by + * metastore partition. + */ + struct build_contexts { + absl::flat_hash_map objects; + absl::flat_hash_map + tidp_to_object; + absl::flat_hash_map ntp_to_tidp; + }; + // Top-level background worker that drives reconciliation. ss::future<> reconciliation_loop(); ssx::semaphore _control_sem{0, "reconciler::semaphore"}; @@ -131,7 +155,8 @@ class reconciler { * then it is uploaded to cloud storage, and finally it is committed. * TODO: This process occurs for each domain, once using the metastore. */ - ss::future> build_object(); + ss::future> + build_objects(l1::metastore::object_metadata_builder*); ss::future<> commit_object(const l1::object_id&, const partition_commit_info&); @@ -143,6 +168,21 @@ class reconciler { ss::future make_reader(const attached_partition&, size_t); + /* + * Add partition data to an L1 object builder. Returns the partition + * metadata if any batches were consumed, nullopt otherwise. + */ + ss::future> add_partition_to_object( + l1::object_builder*, const attached_partition&, size_t); + + /* + * Set up build contexts for all partitions, creating object builders + * and mapping partitions to objects via the metastore. + */ + ss::future> setup_contexts( + l1::metastore::object_metadata_builder*, + const std::vector&); + std::expected get_topic_id_partition(const model::ntp&) const; From 9ac4245358c52ecf3fe5374bef177bca82a982f4 Mon Sep 17 00:00:00 2001 From: Will Berkeley Date: Tue, 26 Aug 2025 10:12:14 -0700 Subject: [PATCH 5/6] cloud_topics/reconciler: commit object to the metastore This commits the objects to the metastore once they've been built and uploaded. --- src/v/cloud_topics/reconciler/reconciler.cc | 142 +++++++++++++++++++- src/v/cloud_topics/reconciler/reconciler.h | 18 +++ 2 files changed, 154 insertions(+), 6 deletions(-) diff --git a/src/v/cloud_topics/reconciler/reconciler.cc b/src/v/cloud_topics/reconciler/reconciler.cc index c0e4e0232cfb1..b7d4d563e89da 100644 --- a/src/v/cloud_topics/reconciler/reconciler.cc +++ b/src/v/cloud_topics/reconciler/reconciler.cc @@ -22,6 +22,8 @@ #include #include +#include + namespace { ss::logger rlog("reconciler"); @@ -188,9 +190,17 @@ ss::future<> reconciler::reconcile() { continue; } vlog(rlog.debug, "Uploaded L1 object {}", object.object_id); + } + + // Commit all objects to metastore. + l1::metastore::term_offset_map_t term_offset_map; + co_await populate_metastore_builder( + objects, metadata_builder.get(), term_offset_map); + co_await commit_to_metastore(std::move(metadata_builder), term_offset_map); - // Commit. - // TODO: This is the old way, not the metastore way. + // Update LRO for each partition. + // TODO: This is the old way, not the metastore way. + for (auto& object : objects) { for (const auto& partition_info : object.partitions) { co_await commit_object(object.object_id, partition_info); } @@ -276,12 +286,9 @@ ss::future> reconciler::build_objects( co_return objects; } +// TODO: Update this. It doesn't commit, it updates the LROs. ss::future<> reconciler::commit_object( const l1::object_id& object_id, const partition_commit_info& partition_info) { - /* - * TODO register the L1 object with L1 metastore using object_id and - * partition_info. - */ const auto& part = partition_info.partition->partition; const auto& metadata = partition_info.metadata; @@ -387,6 +394,129 @@ reconciler::add_partition_to_object( co_return metadata.value(); } +ss::future<> reconciler::populate_metastore_builder( + const chunked_vector& objects, + l1::metastore::object_metadata_builder* metadata_builder, + l1::metastore::term_offset_map_t& term_offset_map) { + for (const auto& object : objects) { + // Add all partitions for this object + for (const auto& partition_info : object.partitions) { + const auto& partition = partition_info.partition; + const auto& metadata = partition_info.metadata; + + // Get topic_id_partition for this partition + auto tidp_result = get_topic_id_partition( + partition->partition->ntp()); + if (!tidp_result.has_value()) { + vlog( + rlog.error, + "Failed to get topic_id_partition for {}: {}", + partition->partition->ntp(), + tidp_result.error()); + continue; + } + + auto tidp = tidp_result.value(); + + // Find the partition info in the footer for this NTP + const auto& footer = object.object_info.index; + auto [begin, end] = footer.partitions.equal_range( + partition->partition->ntp()); + + // Find the matching partition entry based on offset range + ssize_t pos = -1; + size_t size = 0; + auto matching_partition = std::ranges::find_if( + std::ranges::subrange(begin, end), + [&metadata](const auto& entry) { + const auto& footer_partition = entry.second; + return footer_partition.first_offset == metadata.base_offset + && footer_partition.last_offset + == metadata.last_offset; + }); + + vassert( + matching_partition != end, + "Failed to find partition {} with offsets [{}, {}] in footer", + partition->partition->ntp(), + metadata.base_offset, + metadata.last_offset); + + pos = matching_partition->second.file_position; + size = matching_partition->second.length; + + // Add partition metadata to metastore builder + l1::metastore::object_metadata::ntp_metadata ntp_meta; + ntp_meta.tidp = tidp; + ntp_meta.base_offset = metadata.base_offset; + ntp_meta.last_offset = metadata.last_offset; + ntp_meta.max_timestamp = metadata.last_timestamp; + ntp_meta.pos = pos; + ntp_meta.size = size; + + auto add_result = metadata_builder->add(object.object_id, ntp_meta); + if (!add_result.has_value()) { + vlog( + rlog.error, + "Failed to add partition {} to metastore builder: {}", + tidp, + add_result.error()); + continue; + } + + // Build term offset map for this partition + chunked_vector term_offsets; + for (const auto& [term, offset] : metadata.terms) { + term_offsets.emplace_back( + l1::metastore::term_offset{ + .term = term, .first_offset = offset}); + } + + if (!term_offsets.empty()) { + term_offset_map[tidp] = std::move(term_offsets); + } + } + + // Finish the object once after all partitions are added + auto finish_result = metadata_builder->finish( + object.object_id, object.object_info.footer_offset); + if (!finish_result.has_value()) { + vlog( + rlog.error, + "Failed to finish object {} in metastore builder: {}", + object.object_id, + finish_result.error()); + } + } + + co_return; +} + +ss::future<> reconciler::commit_to_metastore( + std::unique_ptr metadata_builder, + const l1::metastore::term_offset_map_t& term_offset_map) { + auto result = co_await _metastore->add_objects( + std::move(metadata_builder), term_offset_map); + if (!result.has_value()) { + vlog( + rlog.error, + "Failed to add objects to metastore: {}", + static_cast(result.error())); + co_return; + } + + // Handle any corrected offsets if needed + if (!result->corrected_next_offsets.empty()) { + vlog( + rlog.info, + "Metastore returned {} corrected next offsets", + result->corrected_next_offsets.size()); + // TODO: Update LRO tracking based on corrected offsets + } + + co_return; +} + ss::future> reconciler::setup_contexts( l1::metastore::object_metadata_builder* metadata_builder, diff --git a/src/v/cloud_topics/reconciler/reconciler.h b/src/v/cloud_topics/reconciler/reconciler.h index 5a4721f8fd7a8..a35b2151cc129 100644 --- a/src/v/cloud_topics/reconciler/reconciler.h +++ b/src/v/cloud_topics/reconciler/reconciler.h @@ -157,6 +157,24 @@ class reconciler { */ ss::future> build_objects(l1::metastore::object_metadata_builder*); + + /* + * Populate the metastore builder with metadata from all built objects + * and build the term offset map for committing to the metastore. + */ + ss::future<> populate_metastore_builder( + const chunked_vector&, + l1::metastore::object_metadata_builder*, + l1::metastore::term_offset_map_t&); + + /* + * Commit all objects to the metastore using the populated builder + * and term offset map. + */ + ss::future<> commit_to_metastore( + std::unique_ptr, + const l1::metastore::term_offset_map_t&); + ss::future<> commit_object(const l1::object_id&, const partition_commit_info&); From ccd30ce7cfb773586a6c419a21d3cc9252775db3 Mon Sep 17 00:00:00 2001 From: Will Berkeley Date: Tue, 26 Aug 2025 10:25:53 -0700 Subject: [PATCH 6/6] cloud_topics/reconciler: update LRO based on metastore response This updates the LRO for partitions based on the metastore's response to the add_objects call, which may contain correction to the LRO. --- src/v/cloud_topics/reconciler/reconciler.cc | 88 ++++++++++++--------- src/v/cloud_topics/reconciler/reconciler.h | 8 +- 2 files changed, 54 insertions(+), 42 deletions(-) diff --git a/src/v/cloud_topics/reconciler/reconciler.cc b/src/v/cloud_topics/reconciler/reconciler.cc index b7d4d563e89da..f861d09b34fb0 100644 --- a/src/v/cloud_topics/reconciler/reconciler.cc +++ b/src/v/cloud_topics/reconciler/reconciler.cc @@ -196,15 +196,8 @@ ss::future<> reconciler::reconcile() { l1::metastore::term_offset_map_t term_offset_map; co_await populate_metastore_builder( objects, metadata_builder.get(), term_offset_map); - co_await commit_to_metastore(std::move(metadata_builder), term_offset_map); - - // Update LRO for each partition. - // TODO: This is the old way, not the metastore way. - for (auto& object : objects) { - for (const auto& partition_info : object.partitions) { - co_await commit_object(object.object_id, partition_info); - } - } + co_await commit_to_metastore( + std::move(metadata_builder), term_offset_map, objects); } ss::future> reconciler::build_objects( @@ -286,26 +279,6 @@ ss::future> reconciler::build_objects( co_return objects; } -// TODO: Update this. It doesn't commit, it updates the LROs. -ss::future<> reconciler::commit_object( - const l1::object_id& object_id, const partition_commit_info& partition_info) { - const auto& part = partition_info.partition->partition; - const auto& metadata = partition_info.metadata; - - partition_info.partition->lro = metadata.last_offset + kafka::offset(1); - - vlog( - rlog.debug, - "Committed overlay to object {} for {} log {}~{}. New LRO {}", - object_id, - part->ntp(), - metadata.base_offset, - metadata.last_offset, - partition_info.partition->lro); - - co_return; -} - ss::future reconciler::make_reader(const attached_partition& partition, size_t max_bytes) { auto& cluster_partition = partition->partition; @@ -494,7 +467,8 @@ ss::future<> reconciler::populate_metastore_builder( ss::future<> reconciler::commit_to_metastore( std::unique_ptr metadata_builder, - const l1::metastore::term_offset_map_t& term_offset_map) { + const l1::metastore::term_offset_map_t& term_offset_map, + const chunked_vector& objects) { auto result = co_await _metastore->add_objects( std::move(metadata_builder), term_offset_map); if (!result.has_value()) { @@ -505,13 +479,53 @@ ss::future<> reconciler::commit_to_metastore( co_return; } - // Handle any corrected offsets if needed - if (!result->corrected_next_offsets.empty()) { - vlog( - rlog.info, - "Metastore returned {} corrected next offsets", - result->corrected_next_offsets.size()); - // TODO: Update LRO tracking based on corrected offsets + // Update LRO for all partitions based on metastore commit results + for (const auto& object : objects) { + for (const auto& partition_info : object.partitions) { + const auto& partition = partition_info.partition; + const auto& metadata = partition_info.metadata; + + // Get topic_id_partition for this partition + auto tidp_result = get_topic_id_partition( + partition->partition->ntp()); + if (!tidp_result.has_value()) { + vlog( + rlog.error, + "Failed to get topic_id_partition for {}: {}", + partition->partition->ntp(), + tidp_result.error()); + continue; + } + + auto tidp = tidp_result.value(); + + // Check if metastore returned a corrected offset for this partition + kafka::offset new_lro; + auto corrected_it = result->corrected_next_offsets.find(tidp); + if (corrected_it != result->corrected_next_offsets.end()) { + new_lro = corrected_it->second; + vlog( + rlog.info, + "Using corrected next offset for {}: {}", + partition->partition->ntp(), + new_lro); + } else { + // No correction, use one past the last offset we uploaded + new_lro = metadata.last_offset + kafka::offset(1); + } + + // Update the partition's LRO + partition->lro = new_lro; + + vlog( + rlog.debug, + "Updated LRO for {} to {} (offsets {}~{} in object {})", + partition->partition->ntp(), + new_lro, + metadata.base_offset, + metadata.last_offset, + object.object_id); + } } co_return; diff --git a/src/v/cloud_topics/reconciler/reconciler.h b/src/v/cloud_topics/reconciler/reconciler.h index a35b2151cc129..ecbdedade0faf 100644 --- a/src/v/cloud_topics/reconciler/reconciler.h +++ b/src/v/cloud_topics/reconciler/reconciler.h @@ -169,14 +169,12 @@ class reconciler { /* * Commit all objects to the metastore using the populated builder - * and term offset map. + * and term offset map, then update partition LROs based on the response. */ ss::future<> commit_to_metastore( std::unique_ptr, - const l1::metastore::term_offset_map_t&); - - ss::future<> - commit_object(const l1::object_id&, const partition_commit_info&); + const l1::metastore::term_offset_map_t&, + const chunked_vector&); /* * Build a partition reader that returns batches to be reconciled. Reading