From 3d7df77398986b6655ea8e1fdf2397c8f5ccd734 Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Thu, 5 Mar 2026 21:13:48 -0500 Subject: [PATCH 1/2] impl(bigtable): add ChannelUsage class --- google/cloud/bigtable/CMakeLists.txt | 2 + .../bigtable/bigtable_client_unit_tests.bzl | 1 + .../bigtable/google_cloud_cpp_bigtable.bzl | 1 + .../cloud/bigtable/internal/channel_usage.h | 123 ++++++++++++++++ .../bigtable/internal/channel_usage_test.cc | 131 ++++++++++++++++++ 5 files changed, 258 insertions(+) create mode 100644 google/cloud/bigtable/internal/channel_usage.h create mode 100644 google/cloud/bigtable/internal/channel_usage_test.cc diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index 81fcec2454f85..5bd03c98ccacc 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -165,6 +165,7 @@ add_library( internal/bigtable_tracing_stub.h internal/bulk_mutator.cc internal/bulk_mutator.h + internal/channel_usage.h internal/client_options_defaults.h internal/common_client.h internal/connection_refresh_state.cc @@ -444,6 +445,7 @@ if (BUILD_TESTING) internal/bigtable_channel_refresh_test.cc internal/bigtable_stub_factory_test.cc internal/bulk_mutator_test.cc + internal/channel_usage_test.cc internal/connection_refresh_state_test.cc internal/convert_policies_test.cc internal/crc32c_test.cc diff --git a/google/cloud/bigtable/bigtable_client_unit_tests.bzl b/google/cloud/bigtable/bigtable_client_unit_tests.bzl index faf9fa9fb9441..bb166aa1556dc 100644 --- a/google/cloud/bigtable/bigtable_client_unit_tests.bzl +++ b/google/cloud/bigtable/bigtable_client_unit_tests.bzl @@ -44,6 +44,7 @@ bigtable_client_unit_tests = [ "internal/bigtable_channel_refresh_test.cc", "internal/bigtable_stub_factory_test.cc", "internal/bulk_mutator_test.cc", + "internal/channel_usage_test.cc", "internal/connection_refresh_state_test.cc", "internal/convert_policies_test.cc", "internal/crc32c_test.cc", diff --git a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl index 28109e02224c5..a3e49011a272f 100644 --- a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl +++ b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl @@ -79,6 +79,7 @@ google_cloud_cpp_bigtable_hdrs = [ "internal/bigtable_stub_factory.h", "internal/bigtable_tracing_stub.h", "internal/bulk_mutator.h", + "internal/channel_usage.h", "internal/client_options_defaults.h", "internal/common_client.h", "internal/connection_refresh_state.h", diff --git a/google/cloud/bigtable/internal/channel_usage.h b/google/cloud/bigtable/internal/channel_usage.h new file mode 100644 index 0000000000000..dfee041bfbbb9 --- /dev/null +++ b/google/cloud/bigtable/internal/channel_usage.h @@ -0,0 +1,123 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_CHANNEL_USAGE_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_CHANNEL_USAGE_H + +#include "google/cloud/bigtable/instance_resource.h" +#include "google/cloud/internal/clock.h" +#include "google/cloud/status_or.h" +#include "google/cloud/version.h" +#include +#include +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +template +class ChannelUsage : public std::enable_shared_from_this> { + public: + using Clock = ::google::cloud::internal::SteadyClock; + ChannelUsage() = default; + explicit ChannelUsage(std::shared_ptr stub, std::shared_ptr clock = + std::make_shared()) + : stub_(std::move(stub)), clock_(std::move(clock)) {} + + StatusOr average_outstanding_rpcs() { + std::unique_lock lk(mu_); + if (!last_refresh_status_.ok()) return last_refresh_status_; + if (measurements_.empty()) return 0; + auto now = clock_->Now(); + auto last_time = now; + auto window_start = now - std::chrono::seconds(60); + + double sum = 0.0; + double total_weight = 0.0; + auto iter = measurements_.rbegin(); + while (iter != measurements_.rend() && iter->timestamp >= window_start) { + double weight = + std::chrono::duration(last_time - iter->timestamp).count(); + last_time = iter->timestamp; + sum += iter->outstanding_rpcs * weight; + total_weight += weight; + ++iter; + } + measurements_.erase(measurements_.begin(), iter.base()); + return total_weight == 0.0 ? static_cast(sum) + : static_cast(sum / total_weight); + } + + StatusOr instant_outstanding_rpcs() { + std::unique_lock lk(mu_); + if (!last_refresh_status_.ok()) return last_refresh_status_; + return outstanding_rpcs_; + } + + ChannelUsage& set_last_refresh_status(Status s) { + std::unique_lock lk(mu_); + last_refresh_status_ = std::move(s); + return *this; + } + + // A channel can only be set if the current value is nullptr. This mutator + // exists only so that we can obtain a std::weak_ptr to the ChannelUsage + // object that will eventually hold the channel. + ChannelUsage& set_stub(std::shared_ptr stub) { + std::unique_lock lk(mu_); + if (!stub_) stub_ = std::move(stub); + return *this; + } + + std::weak_ptr> MakeWeak() { return this->shared_from_this(); } + + std::shared_ptr AcquireStub() { + std::unique_lock lk(mu_); + ++outstanding_rpcs_; + auto time = clock_->Now(); + measurements_.emplace_back(outstanding_rpcs_, time); + return stub_; + } + + void ReleaseStub() { + std::unique_lock lk(mu_); + --outstanding_rpcs_; + measurements_.emplace_back(outstanding_rpcs_, clock_->Now()); + } + + private: + mutable std::mutex mu_; + std::shared_ptr stub_; + std::shared_ptr clock_ = std::make_shared(); + int outstanding_rpcs_ = 0; + Status last_refresh_status_; + struct Measurement { + Measurement(int outstanding_rpcs, std::chrono::steady_clock::time_point p) + : outstanding_rpcs(outstanding_rpcs), timestamp(p) {} + int outstanding_rpcs; + std::chrono::steady_clock::time_point timestamp; + }; + std::deque measurements_; +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_CHANNEL_USAGE_H diff --git a/google/cloud/bigtable/internal/channel_usage_test.cc b/google/cloud/bigtable/internal/channel_usage_test.cc new file mode 100644 index 0000000000000..22541fe779f73 --- /dev/null +++ b/google/cloud/bigtable/internal/channel_usage_test.cc @@ -0,0 +1,131 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigtable/internal/channel_usage.h" +#include "google/cloud/bigtable/testing/mock_bigtable_stub.h" +#include "google/cloud/internal/make_status.h" +#include "google/cloud/testing_util/fake_clock.h" +#include "google/cloud/testing_util/status_matchers.h" +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { + +using ::google::cloud::bigtable::testing::MockBigtableStub; +using ::google::cloud::testing_util::IsOkAndHolds; +using ::google::cloud::testing_util::StatusIs; +using ::testing::Eq; + +TEST(ChannelUsageTest, SetChannel) { + auto mock = std::make_shared(); + auto channel = std::make_shared>(); + EXPECT_THAT(channel->AcquireStub(), Eq(nullptr)); + channel->set_stub(mock); + EXPECT_THAT(channel->AcquireStub(), Eq(mock)); + auto mock2 = std::make_shared(); + channel->set_stub(mock2); + EXPECT_THAT(channel->AcquireStub(), Eq(mock)); +} + +TEST(ChannelUsageTest, InstantOutstandingRpcs) { + // auto clock = std::make_shared(); + auto mock = std::make_shared(); + auto channel = std::make_shared>(mock); + + auto stub = channel->AcquireStub(); + EXPECT_THAT(stub, Eq(mock)); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(1)); + stub = channel->AcquireStub(); + EXPECT_THAT(stub, Eq(mock)); + stub = channel->AcquireStub(); + EXPECT_THAT(stub, Eq(mock)); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(3)); + channel->ReleaseStub(); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(2)); + channel->ReleaseStub(); + channel->ReleaseStub(); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(0)); +} + +TEST(ChannelUsageTest, SetLastRefreshStatus) { + auto mock = std::make_shared(); + auto channel = std::make_shared>(mock); + Status expected_status = internal::InternalError("uh oh"); + (void)channel->AcquireStub(); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(1)); + channel->set_last_refresh_status(expected_status); + EXPECT_THAT(channel->instant_outstanding_rpcs(), + StatusIs(expected_status.code())); + EXPECT_THAT(channel->average_outstanding_rpcs(), + StatusIs(expected_status.code())); +} + +TEST(ChannelUsageTest, AverageOutstandingRpcs) { + auto clock = std::make_shared(); + auto mock = std::make_shared(); + auto channel = std::make_shared>(mock, clock); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(0)); + + auto start = std::chrono::steady_clock::now(); + clock->SetTime(start); + + for (int i = 0; i < 10; ++i) (void)channel->AcquireStub(); + clock->AdvanceTime(std::chrono::seconds(1)); + // sum=10 total_weight=1 + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10)); + + for (int i = 0; i < 10; ++i) (void)channel->AcquireStub(); + clock->AdvanceTime(std::chrono::seconds(1)); + // sum=30, total_weight=2 + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(15)); + + for (int i = 0; i < 20; ++i) channel->ReleaseStub(); + clock->AdvanceTime(std::chrono::seconds(1)); + // sum=30, total_weight=3 + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10)); + + clock->AdvanceTime(std::chrono::seconds(2)); + // sum=30, total_weight=5 + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(6)); + + for (int i = 0; i < 100; ++i) (void)channel->AcquireStub(); + clock->AdvanceTime(std::chrono::seconds(25)); + // sum=2530, total_weight=84 + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(84)); + + clock->AdvanceTime(std::chrono::seconds(35)); + // First 5s of measurements have aged out. + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100)); + + clock->AdvanceTime(std::chrono::seconds(60)); + // All measurements have aged out. + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(0)); +} + +TEST(ChannelUsageTest, MakeWeak) { + auto channel = std::make_shared>(); + auto weak = channel->MakeWeak(); + EXPECT_THAT(weak.lock(), Eq(channel)); + channel.reset(); + EXPECT_THAT(weak.lock(), Eq(nullptr)); +} + +} // namespace +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google From 3358ff200f3431581331585d5d22efcfbaa37ffc Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Fri, 6 Mar 2026 14:07:45 -0500 Subject: [PATCH 2/2] update weighted average calculations --- .../cloud/bigtable/internal/channel_usage.h | 48 +++++++++++++++---- .../bigtable/internal/channel_usage_test.cc | 9 +++- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/google/cloud/bigtable/internal/channel_usage.h b/google/cloud/bigtable/internal/channel_usage.h index dfee041bfbbb9..5eff34af8aeaf 100644 --- a/google/cloud/bigtable/internal/channel_usage.h +++ b/google/cloud/bigtable/internal/channel_usage.h @@ -15,7 +15,6 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_CHANNEL_USAGE_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_CHANNEL_USAGE_H -#include "google/cloud/bigtable/instance_resource.h" #include "google/cloud/internal/clock.h" #include "google/cloud/status_or.h" #include "google/cloud/version.h" @@ -30,6 +29,9 @@ namespace cloud { namespace bigtable_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +// This class wraps a `T`, typically a BigtableStub, and tracks the number of +// outstanding RPCs by taking measurements when the wrapped stub is acquired +// and released. template class ChannelUsage : public std::enable_shared_from_this> { public: @@ -39,13 +41,18 @@ class ChannelUsage : public std::enable_shared_from_this> { std::make_shared()) : stub_(std::move(stub)), clock_(std::move(clock)) {} + // Computes the weighted average of outstanding RPCs on the channel over the + // past 60 seconds. StatusOr average_outstanding_rpcs() { - std::unique_lock lk(mu_); + auto constexpr kWindowSeconds = 60; + auto constexpr kWindowDuration = std::chrono::seconds(kWindowSeconds); + std::scoped_lock lk(mu_); if (!last_refresh_status_.ok()) return last_refresh_status_; + // If there are no measurements then the stub has never been used. if (measurements_.empty()) return 0; auto now = clock_->Now(); auto last_time = now; - auto window_start = now - std::chrono::seconds(60); + auto window_start = now - kWindowDuration; double sum = 0.0; double total_weight = 0.0; @@ -58,19 +65,38 @@ class ChannelUsage : public std::enable_shared_from_this> { total_weight += weight; ++iter; } - measurements_.erase(measurements_.begin(), iter.base()); - return total_weight == 0.0 ? static_cast(sum) + + // It's unlikely we will have a measurement at precisely the beginning of + // the window. So, we need to use the first measurement outside the window + // to compute a measurement for the missing part of the window using a + // weight equal to the missing time. + if (iter != measurements_.rend()) { + double weight = std::max(0.0, kWindowSeconds - total_weight); + sum += iter->outstanding_rpcs * weight; + total_weight += weight; + // We want to keep one measurement that's at least 60s old to provide a + // starting value for the next window. + ++iter; + } + + if (measurements_.size() > 1) { + measurements_.erase(measurements_.begin(), iter.base()); + } + // After iterating through the measurements if the total_weight is zero, + // then all of the measurements occurred at time == now, and returning the + // current number of outstanding RPCs is most correct. + return total_weight == 0.0 ? outstanding_rpcs_ : static_cast(sum / total_weight); } StatusOr instant_outstanding_rpcs() { - std::unique_lock lk(mu_); + std::scoped_lock lk(mu_); if (!last_refresh_status_.ok()) return last_refresh_status_; return outstanding_rpcs_; } ChannelUsage& set_last_refresh_status(Status s) { - std::unique_lock lk(mu_); + std::scoped_lock lk(mu_); last_refresh_status_ = std::move(s); return *this; } @@ -79,7 +105,7 @@ class ChannelUsage : public std::enable_shared_from_this> { // exists only so that we can obtain a std::weak_ptr to the ChannelUsage // object that will eventually hold the channel. ChannelUsage& set_stub(std::shared_ptr stub) { - std::unique_lock lk(mu_); + std::scoped_lock lk(mu_); if (!stub_) stub_ = std::move(stub); return *this; } @@ -87,7 +113,7 @@ class ChannelUsage : public std::enable_shared_from_this> { std::weak_ptr> MakeWeak() { return this->shared_from_this(); } std::shared_ptr AcquireStub() { - std::unique_lock lk(mu_); + std::scoped_lock lk(mu_); ++outstanding_rpcs_; auto time = clock_->Now(); measurements_.emplace_back(outstanding_rpcs_, time); @@ -95,7 +121,7 @@ class ChannelUsage : public std::enable_shared_from_this> { } void ReleaseStub() { - std::unique_lock lk(mu_); + std::scoped_lock lk(mu_); --outstanding_rpcs_; measurements_.emplace_back(outstanding_rpcs_, clock_->Now()); } @@ -112,6 +138,8 @@ class ChannelUsage : public std::enable_shared_from_this> { int outstanding_rpcs; std::chrono::steady_clock::time_point timestamp; }; + // Older measurements are removed as part of the average_outstanding_rpcs + // method. std::deque measurements_; }; diff --git a/google/cloud/bigtable/internal/channel_usage_test.cc b/google/cloud/bigtable/internal/channel_usage_test.cc index 22541fe779f73..2553cb3f89db3 100644 --- a/google/cloud/bigtable/internal/channel_usage_test.cc +++ b/google/cloud/bigtable/internal/channel_usage_test.cc @@ -42,7 +42,6 @@ TEST(ChannelUsageTest, SetChannel) { } TEST(ChannelUsageTest, InstantOutstandingRpcs) { - // auto clock = std::make_shared(); auto mock = std::make_shared(); auto channel = std::make_shared>(mock); @@ -84,6 +83,8 @@ TEST(ChannelUsageTest, AverageOutstandingRpcs) { clock->SetTime(start); for (int i = 0; i < 10; ++i) (void)channel->AcquireStub(); + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10)); + clock->AdvanceTime(std::chrono::seconds(1)); // sum=10 total_weight=1 EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10)); @@ -113,7 +114,11 @@ TEST(ChannelUsageTest, AverageOutstandingRpcs) { clock->AdvanceTime(std::chrono::seconds(60)); // All measurements have aged out. - EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(0)); + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100)); + + clock->AdvanceTime(std::chrono::seconds(3600)); + // All measurements have aged out. + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100)); } TEST(ChannelUsageTest, MakeWeak) {