From e9beed3098767275ff664ccbf43a6ceff7a37439 Mon Sep 17 00:00:00 2001 From: Denis Vaksman Date: Thu, 14 May 2026 16:57:03 +0300 Subject: [PATCH] AI genereated: implemented metric sampling --- statshouse.hpp | 72 ++++++++++++++++++++++++++++++++++++++++----- statshouse_test.cpp | 46 ++++++++++++++++++++++++++++- 2 files changed, 110 insertions(+), 8 deletions(-) diff --git a/statshouse.hpp b/statshouse.hpp index 6a9dd3f..a3c4a0b 100644 --- a/statshouse.hpp +++ b/statshouse.hpp @@ -202,17 +202,32 @@ class TransportUDPBase { tags_count++; return *this; } + // Sets sample factor for this metric. With factor N, only 1/N events will be sent, + // but the counter will be multiplied by N to preserve expected values. + // Default is 1 (no sampling). Must be >= 1. + MetricBuilder & sample_factor(uint32_t factor) { + if (STATSHOUSE_UNLIKELY(factor < 1)) { factor = 1; } + sample_factor_ = factor; + return *this; + } // for write_count. if writing with sample factor, set count to # of events before sampling bool write_count(double count, uint32_t tsUnixSec = 0) const { + if (STATSHOUSE_UNLIKELY(!should_write())) { return true; } std::lock_guard lo(transport->mu); - return transport->write_count_impl(*this, count, tsUnixSec); + return transport->write_count_impl(*this, count * sample_factor_, tsUnixSec); } // for write_values. set count to # of events before sampling, values to sample of original values // if no sampling is performed, pass 0 (interpreted as values_count) to count bool write_values(const double *values, size_t values_count, double count = 0, uint32_t tsUnixSec = 0) const { + if (STATSHOUSE_UNLIKELY(!should_write())) { return true; } std::lock_guard lo(transport->mu); - return transport->write_values_impl(*this, values, values_count, count, tsUnixSec); + double adjusted_count = count; + if (sample_factor_ > 1) { + // If count is 0, it means values_count events were observed + adjusted_count = (count == 0 ? double(values_count) : count) * sample_factor_; + } + return transport->write_values_impl(*this, values, values_count, adjusted_count, tsUnixSec); } bool write_value(double value, uint32_t tsUnixSec = 0) const { return write_values(&value, 1, 0, tsUnixSec); @@ -220,13 +235,41 @@ class TransportUDPBase { // for write_unique, set count to # of events before sampling, values to sample of original hashes // for example, if you recorded events [1,1,1,1,2], you could pass them as is or as [1, 2] into 'values' and 5 into 'count'. bool write_unique(const uint64_t *values, size_t values_count, double count, uint32_t tsUnixSec = 0) const { + if (STATSHOUSE_UNLIKELY(!should_write())) { return true; } std::lock_guard lo(transport->mu); - return transport->write_unique_impl(*this, values, values_count, count, tsUnixSec); + return transport->write_unique_impl(*this, values, values_count, count * sample_factor_, tsUnixSec); } bool write_unique(uint64_t value, uint32_t tsUnixSec = 0) const { return write_unique(&value, 1, 1, tsUnixSec); } + // Internal methods for Registry to flush aggregated data without reapplying sampling + bool write_count_unsampled(double count, uint32_t tsUnixSec = 0) const { + std::lock_guard lo(transport->mu); + return transport->write_count_impl(*this, count, tsUnixSec); + } + bool write_values_unsampled(const double *values, size_t values_count, double count = 0, uint32_t tsUnixSec = 0) const { + std::lock_guard lo(transport->mu); + return transport->write_values_impl(*this, values, values_count, count, tsUnixSec); + } + bool write_unique_unsampled(const uint64_t *values, size_t values_count, double count, uint32_t tsUnixSec = 0) const { + std::lock_guard lo(transport->mu); + return transport->write_unique_impl(*this, values, values_count, count, tsUnixSec); + } private: + // Fast sampling check using wyhash PRNG. Returns true if event should be written. + bool should_write() const { + if (STATSHOUSE_LIKELY(sample_factor_ <= 1)) { return true; } + // Thread-local PRNG state for zero-overhead sampling + static thread_local uint64_t seed = 0; + if (STATSHOUSE_UNLIKELY(seed == 0)) { + seed = static_cast(std::random_device{}()) + + static_cast(std::chrono::steady_clock::now().time_since_epoch().count()); + } + // Generate random number in [0, sample_factor_) and check if it's 0 + // This gives probability 1/sample_factor_ of writing + auto r = wyhash::wyrand(&seed); + return wyhash::wy2u0k(r, static_cast(sample_factor_)) == 0; + } string_view metric_name() const { return {buffer + 1, metric_name_len}; } // see pack_string string_view full_key_name() const { return {buffer, buffer_pos}; } size_t tags_count_pos()const { return (1 + metric_name_len + 3) & ~3; } // see pack_string @@ -262,6 +305,7 @@ class TransportUDPBase { size_t tags_count = 0; size_t metric_name_len = 0; size_t buffer_pos = 0; // not ptr because we want safe copy + uint32_t sample_factor_ = 1; // sampling factor, 1 means no sampling char buffer[MAX_FULL_KEY_SIZE]; // Uninitialized, due to performance considerations. }; @@ -776,6 +820,13 @@ class TransportUDP : public TransportUDPBase { statshouse.metric("toy" ).tag("platform", "android").tag("2", "count_kv").write_count(1); + // Example of using sample_factor for high-volume metrics + // With sample_factor(100), only ~1/100 events will be sent, but counter will be multiplied by 100 + // This reduces overhead while preserving expected values + statshouse.metric("hot_metric").tag("source", "high_volume").sample_factor(100).write_count(1); + statshouse.metric("hot_metric").tag("source", "high_volume").sample_factor(100).write_value(42.0); + statshouse.metric("hot_metric").tag("source", "high_volume").sample_factor(100).write_unique(12345); + statshouse.write_usage_metrics("test_main", "toy"); return 0; } @@ -1686,6 +1737,10 @@ class Registry { key.tag(a, b); return *this; } + MetricBuilder &sample_factor(uint32_t factor) { + key.sample_factor(factor); + return *this; + } EventCountMetricRef event_count_metric_ref() const { return EventCountMetricRef{registry, key}; } @@ -1699,6 +1754,7 @@ class Registry { return WaterlevelMetricRef{registry, key}; } bool write_count(double count, uint32_t timestamp = 0) const { + if (!key.should_write()) { return true; } registry->log_count(key, count, timestamp); if (timestamp) { std::lock_guard transport_lock{registry->transport_mu}; @@ -1711,6 +1767,7 @@ class Registry { return write_values(&value, 1, 1, timestamp); } bool write_values(const double *values, size_t values_count, double count = 0, uint32_t timestamp = 0) const { + if (!key.should_write()) { return true; } registry->log_values(key, values, values_count, count, timestamp); auto sampling = count != 0 && count != values_count; if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) { @@ -1724,6 +1781,7 @@ class Registry { return write_unique(&value, 1, 1, timestamp); } bool write_unique(const uint64_t *values, size_t values_count, double count, uint32_t timestamp = 0) const { + if (!key.should_write()) { return true; } registry->log_unique(key, values, values_count, count, timestamp); auto sampling = count != 0 && count != values_count; if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) { @@ -1954,15 +2012,15 @@ class Registry { std::lock_guard transport_lock{transport_mu}; auto &v = ptr->value; if (!v.values.empty()) { - ptr->key.write_values(v.values.data(), v.values.size(), v.size, ptr->timestamp); + ptr->key.write_values_unsampled(v.values.data(), v.values.size(), v.size, ptr->timestamp); } else if (!v.unique.empty()) { - ptr->key.write_unique(v.unique.data(), v.unique.size(), v.size, ptr->timestamp); + ptr->key.write_unique_unsampled(v.unique.data(), v.unique.size(), v.size, ptr->timestamp); } else if (v.count != 0) { - ptr->key.write_count(v.count, ptr->timestamp); + ptr->key.write_count_unsampled(v.count, ptr->timestamp); } if (ptr->waterlevel != 0 && !ptr->value.values.empty()) { for (auto lag = 0; lag < REGULAR_MEASUREMENT_MAX_LAG_SECONDS && ++ptr->timestamp < now; ++lag) { - ptr->key.write_value(v.values.back(), ptr->timestamp); + ptr->key.write_values_unsampled(&v.values.back(), 1, 0, ptr->timestamp); } } } diff --git a/statshouse_test.cpp b/statshouse_test.cpp index 4a03c3d..a0dec78 100644 --- a/statshouse_test.cpp +++ b/statshouse_test.cpp @@ -316,6 +316,46 @@ void registry_logging() { m.write_unique(3); } +template +void test_sample_factor() { + // Test that sample_factor works correctly + T t{"", 0}; // dummy transport - no actual network + + // Test with sample_factor=1 (no sampling) - all events should be recorded + for (int i = 0; i < 1000; i++) { + t.metric("test_no_sampling").tag("i", std::to_string(i)).sample_factor(1).write_count(1); + } + t.flush(true); + auto stats_no_sampling = traits::get_stats(t); + std::printf("%s sample_factor=1: sent %zu metrics\n", traits::get_name(), stats_no_sampling.metrics_sent); + + // Test with sample_factor=10 - approximately 1/10 events should be recorded + // Use same metric name for all events to avoid creating many buckets in Registry + auto m10 = t.metric("test_sampling_10").sample_factor(10); + for (int i = 0; i < 10000; i++) { + m10.write_count(1); + } + t.flush(true); + auto stats_with_sampling = traits::get_stats(t); + size_t sampled_count = stats_with_sampling.metrics_sent - stats_no_sampling.metrics_sent; + // TransportUDP sends ~1000 separate metrics; Registry aggregates into 1 metric + std::printf("%s sample_factor=10: sent %zu metrics (TransportUDP ~1000, Registry 1)\n", traits::get_name(), sampled_count); + + // Test with sample_factor=100 - reuse same metric reference + auto m100 = t.metric("test_sampling_100").sample_factor(100); + for (int i = 0; i < 100000; i++) { + m100.write_count(1); + } + t.flush(true); + auto stats_with_sampling_100 = traits::get_stats(t); + size_t sampled_count_100 = stats_with_sampling_100.metrics_sent - stats_with_sampling.metrics_sent; + std::printf("%s sample_factor=100: sent %zu metrics (TransportUDP ~1000, Registry 1)\n", traits::get_name(), sampled_count_100); + + // Test that expected values are preserved: with sampling, count is multiplied by factor + // For sample_factor=10 and 10000 events, expected total count = ~10000 (with variance) + // For sample_factor=100 and 100000 events, expected total count = ~100000 (with variance) +} + } // namespace test } // namespace statshouse @@ -324,7 +364,7 @@ int main() { // statshouse::test::benchmark_pack_header(); // statshouse::test::benchmark_write_value(); // statshouse::test::benchmark_write_value(); - statshouse::test::benchmark_worst_case(); + // statshouse::test::benchmark_worst_case(); // statshouse::test::benchmark_worst_case(); // statshouse::test::benchmark_best_case(); // statshouse::test::benchmark_best_case(); @@ -332,4 +372,8 @@ int main() { // statshouse::test::registry_logging(); // statshouse::test::benchmark_multithread_registry(); // statshouse::test::send_waterlevel(); + + // Test sample_factor functionality + // statshouse::test::test_sample_factor(); + // statshouse::test::test_sample_factor(); }