Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 65 additions & 7 deletions statshouse.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,31 +202,74 @@ class TransportUDPBase {
tags_count++;
return *this;
}
// Sets sample factor for this metric. With factor N, only 1/N events will be sent,
// but the counter will be multiplied by N to preserve expected values.
// Default is 1 (no sampling). Must be >= 1.
MetricBuilder & sample_factor(uint32_t factor) {
if (STATSHOUSE_UNLIKELY(factor < 1)) { factor = 1; }
sample_factor_ = factor;
return *this;
}

// for write_count. if writing with sample factor, set count to # of events before sampling
bool write_count(double count, uint32_t tsUnixSec = 0) const {
if (STATSHOUSE_UNLIKELY(!should_write())) { return true; }
std::lock_guard<mutex> lo(transport->mu);
return transport->write_count_impl(*this, count, tsUnixSec);
return transport->write_count_impl(*this, count * sample_factor_, tsUnixSec);
}
// for write_values. set count to # of events before sampling, values to sample of original values
// if no sampling is performed, pass 0 (interpreted as values_count) to count
bool write_values(const double *values, size_t values_count, double count = 0, uint32_t tsUnixSec = 0) const {
if (STATSHOUSE_UNLIKELY(!should_write())) { return true; }
std::lock_guard<mutex> lo(transport->mu);
return transport->write_values_impl(*this, values, values_count, count, tsUnixSec);
double adjusted_count = count;
if (sample_factor_ > 1) {
// If count is 0, it means values_count events were observed
adjusted_count = (count == 0 ? double(values_count) : count) * sample_factor_;
}
return transport->write_values_impl(*this, values, values_count, adjusted_count, tsUnixSec);
}
bool write_value(double value, uint32_t tsUnixSec = 0) const {
return write_values(&value, 1, 0, tsUnixSec);
}
// for write_unique, set count to # of events before sampling, values to sample of original hashes
// for example, if you recorded events [1,1,1,1,2], you could pass them as is or as [1, 2] into 'values' and 5 into 'count'.
bool write_unique(const uint64_t *values, size_t values_count, double count, uint32_t tsUnixSec = 0) const {
if (STATSHOUSE_UNLIKELY(!should_write())) { return true; }
std::lock_guard<mutex> lo(transport->mu);
return transport->write_unique_impl(*this, values, values_count, count, tsUnixSec);
return transport->write_unique_impl(*this, values, values_count, count * sample_factor_, tsUnixSec);
}
bool write_unique(uint64_t value, uint32_t tsUnixSec = 0) const {
return write_unique(&value, 1, 1, tsUnixSec);
}
// Internal methods for Registry to flush aggregated data without reapplying sampling
bool write_count_unsampled(double count, uint32_t tsUnixSec = 0) const {
std::lock_guard<mutex> lo(transport->mu);
return transport->write_count_impl(*this, count, tsUnixSec);
}
bool write_values_unsampled(const double *values, size_t values_count, double count = 0, uint32_t tsUnixSec = 0) const {
std::lock_guard<mutex> lo(transport->mu);
return transport->write_values_impl(*this, values, values_count, count, tsUnixSec);
}
bool write_unique_unsampled(const uint64_t *values, size_t values_count, double count, uint32_t tsUnixSec = 0) const {
std::lock_guard<mutex> lo(transport->mu);
return transport->write_unique_impl(*this, values, values_count, count, tsUnixSec);
}
private:
// Fast sampling check using wyhash PRNG. Returns true if event should be written.
bool should_write() const {
if (STATSHOUSE_LIKELY(sample_factor_ <= 1)) { return true; }
// Thread-local PRNG state for zero-overhead sampling
static thread_local uint64_t seed = 0;
if (STATSHOUSE_UNLIKELY(seed == 0)) {
seed = static_cast<uint64_t>(std::random_device{}()) +
static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
}
// Generate random number in [0, sample_factor_) and check if it's 0
// This gives probability 1/sample_factor_ of writing
auto r = wyhash::wyrand(&seed);
return wyhash::wy2u0k(r, static_cast<uint64_t>(sample_factor_)) == 0;
}
string_view metric_name() const { return {buffer + 1, metric_name_len}; } // see pack_string
string_view full_key_name() const { return {buffer, buffer_pos}; }
size_t tags_count_pos()const { return (1 + metric_name_len + 3) & ~3; } // see pack_string
Expand Down Expand Up @@ -262,6 +305,7 @@ class TransportUDPBase {
size_t tags_count = 0;
size_t metric_name_len = 0;
size_t buffer_pos = 0; // not ptr because we want safe copy
uint32_t sample_factor_ = 1; // sampling factor, 1 means no sampling
char buffer[MAX_FULL_KEY_SIZE]; // Uninitialized, due to performance considerations.
};

Expand Down Expand Up @@ -776,6 +820,13 @@ class TransportUDP : public TransportUDPBase {

statshouse.metric("toy" ).tag("platform", "android").tag("2", "count_kv").write_count(1);

// Example of using sample_factor for high-volume metrics
// With sample_factor(100), only ~1/100 events will be sent, but counter will be multiplied by 100
// This reduces overhead while preserving expected values
statshouse.metric("hot_metric").tag("source", "high_volume").sample_factor(100).write_count(1);
statshouse.metric("hot_metric").tag("source", "high_volume").sample_factor(100).write_value(42.0);
statshouse.metric("hot_metric").tag("source", "high_volume").sample_factor(100).write_unique(12345);

statshouse.write_usage_metrics("test_main", "toy");
return 0;
}
Expand Down Expand Up @@ -1686,6 +1737,10 @@ class Registry {
key.tag(a, b);
return *this;
}
MetricBuilder &sample_factor(uint32_t factor) {
key.sample_factor(factor);
return *this;
}
EventCountMetricRef event_count_metric_ref() const {
return EventCountMetricRef{registry, key};
}
Expand All @@ -1699,6 +1754,7 @@ class Registry {
return WaterlevelMetricRef{registry, key};
}
bool write_count(double count, uint32_t timestamp = 0) const {
if (!key.should_write()) { return true; }
registry->log_count(key, count, timestamp);
if (timestamp) {
std::lock_guard<std::mutex> transport_lock{registry->transport_mu};
Expand All @@ -1711,6 +1767,7 @@ class Registry {
return write_values(&value, 1, 1, timestamp);
}
bool write_values(const double *values, size_t values_count, double count = 0, uint32_t timestamp = 0) const {
if (!key.should_write()) { return true; }
registry->log_values(key, values, values_count, count, timestamp);
auto sampling = count != 0 && count != values_count;
if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) {
Expand All @@ -1724,6 +1781,7 @@ class Registry {
return write_unique(&value, 1, 1, timestamp);
}
bool write_unique(const uint64_t *values, size_t values_count, double count, uint32_t timestamp = 0) const {
if (!key.should_write()) { return true; }
registry->log_unique(key, values, values_count, count, timestamp);
auto sampling = count != 0 && count != values_count;
if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) {
Expand Down Expand Up @@ -1954,15 +2012,15 @@ class Registry {
std::lock_guard<std::mutex> transport_lock{transport_mu};
auto &v = ptr->value;
if (!v.values.empty()) {
ptr->key.write_values(v.values.data(), v.values.size(), v.size, ptr->timestamp);
ptr->key.write_values_unsampled(v.values.data(), v.values.size(), v.size, ptr->timestamp);
} else if (!v.unique.empty()) {
ptr->key.write_unique(v.unique.data(), v.unique.size(), v.size, ptr->timestamp);
ptr->key.write_unique_unsampled(v.unique.data(), v.unique.size(), v.size, ptr->timestamp);
} else if (v.count != 0) {
ptr->key.write_count(v.count, ptr->timestamp);
ptr->key.write_count_unsampled(v.count, ptr->timestamp);
}
if (ptr->waterlevel != 0 && !ptr->value.values.empty()) {
for (auto lag = 0; lag < REGULAR_MEASUREMENT_MAX_LAG_SECONDS && ++ptr->timestamp < now; ++lag) {
ptr->key.write_value(v.values.back(), ptr->timestamp);
ptr->key.write_values_unsampled(&v.values.back(), 1, 0, ptr->timestamp);
}
}
}
Expand Down
46 changes: 45 additions & 1 deletion statshouse_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,46 @@ void registry_logging() {
m.write_unique(3);
}

template<typename T>
void test_sample_factor() {
// Test that sample_factor works correctly
T t{"", 0}; // dummy transport - no actual network

// Test with sample_factor=1 (no sampling) - all events should be recorded
for (int i = 0; i < 1000; i++) {
t.metric("test_no_sampling").tag("i", std::to_string(i)).sample_factor(1).write_count(1);
}
t.flush(true);
auto stats_no_sampling = traits<T>::get_stats(t);
std::printf("%s sample_factor=1: sent %zu metrics\n", traits<T>::get_name(), stats_no_sampling.metrics_sent);

// Test with sample_factor=10 - approximately 1/10 events should be recorded
// Use same metric name for all events to avoid creating many buckets in Registry
auto m10 = t.metric("test_sampling_10").sample_factor(10);
for (int i = 0; i < 10000; i++) {
m10.write_count(1);
}
t.flush(true);
auto stats_with_sampling = traits<T>::get_stats(t);
size_t sampled_count = stats_with_sampling.metrics_sent - stats_no_sampling.metrics_sent;
// TransportUDP sends ~1000 separate metrics; Registry aggregates into 1 metric
std::printf("%s sample_factor=10: sent %zu metrics (TransportUDP ~1000, Registry 1)\n", traits<T>::get_name(), sampled_count);

// Test with sample_factor=100 - reuse same metric reference
auto m100 = t.metric("test_sampling_100").sample_factor(100);
for (int i = 0; i < 100000; i++) {
m100.write_count(1);
}
t.flush(true);
auto stats_with_sampling_100 = traits<T>::get_stats(t);
size_t sampled_count_100 = stats_with_sampling_100.metrics_sent - stats_with_sampling.metrics_sent;
std::printf("%s sample_factor=100: sent %zu metrics (TransportUDP ~1000, Registry 1)\n", traits<T>::get_name(), sampled_count_100);

// Test that expected values are preserved: with sampling, count is multiplied by factor
// For sample_factor=10 and 10000 events, expected total count = ~10000 (with variance)
// For sample_factor=100 and 100000 events, expected total count = ~100000 (with variance)
}

} // namespace test
} // namespace statshouse

Expand All @@ -324,12 +364,16 @@ int main() {
// statshouse::test::benchmark_pack_header<statshouse::Registry>();
// statshouse::test::benchmark_write_value<statshouse::TransportUDP>();
// statshouse::test::benchmark_write_value<statshouse::Registry>();
statshouse::test::benchmark_worst_case<statshouse::Registry>();
// statshouse::test::benchmark_worst_case<statshouse::Registry>();
// statshouse::test::benchmark_worst_case<statshouse::TransportUDP>();
// statshouse::test::benchmark_best_case<statshouse::TransportUDP>();
// statshouse::test::benchmark_best_case<statshouse::Registry>();
// statshouse::test::send_regular();
// statshouse::test::registry_logging();
// statshouse::test::benchmark_multithread_registry();
// statshouse::test::send_waterlevel();

// Test sample_factor functionality
// statshouse::test::test_sample_factor<statshouse::TransportUDP>();
// statshouse::test::test_sample_factor<statshouse::Registry>();
}