diff --git a/src/lib.rs b/src/lib.rs index 7d5bf1fd..d2a3a463 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,9 +32,9 @@ pub use execution_plans::{ }; pub use metrics::{ AvgLatencyMetric, BytesCounterMetric, BytesMetricExt, DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, - DistributedMetricsFormat, FirstLatencyMetric, LatencyMetricExt, MaxLatencyMetric, - MinLatencyMetric, P50LatencyMetric, P75LatencyMetric, P95LatencyMetric, P99LatencyMetric, - rewrite_distributed_plan_with_metrics, + DistributedMetricsFormat, FirstLatencyMetric, GaugeMetricExt, LatencyMetricExt, MaxGaugeMetric, + MaxLatencyMetric, MinLatencyMetric, P50LatencyMetric, P75LatencyMetric, P95LatencyMetric, + P99LatencyMetric, rewrite_distributed_plan_with_metrics, }; pub use networking::{ BoxCloneSyncChannel, ChannelResolver, DefaultChannelResolver, WorkerResolver, diff --git a/src/metrics/max_gauge_metric.rs b/src/metrics/max_gauge_metric.rs new file mode 100644 index 00000000..9091204b --- /dev/null +++ b/src/metrics/max_gauge_metric.rs @@ -0,0 +1,128 @@ +use datafusion::physical_plan::metrics::{CustomMetricValue, MetricBuilder, MetricValue}; +use std::sync::atomic::Ordering::Relaxed; +use std::{ + any::Any, + borrow::Cow, + fmt::{Display, Formatter}, + sync::{Arc, atomic::AtomicUsize}, +}; + +/// Extension trait for DataFusion's metric system that adds support for a Gauge metric that +/// aggregates to others using `max` instead of `sum` +pub trait GaugeMetricExt { + fn max_gauge(self, name: impl Into>) -> MaxGaugeMetric; +} + +impl GaugeMetricExt for MetricBuilder<'_> { + fn max_gauge(self, name: impl Into>) -> MaxGaugeMetric { + let value = MaxGaugeMetric::default(); + self.build(MetricValue::Custom { + name: name.into(), + value: Arc::new(value.clone()), + }); + value + } +} + +/// Similar to DataFusion's Gauge metric, but aggregates between instances using `max` instead of +/// `sum`. +#[derive(Debug, Clone)] +pub struct MaxGaugeMetric { + value: Arc, +} + +impl Default for MaxGaugeMetric { + fn default() -> Self { + Self { + value: Arc::new(AtomicUsize::new(usize::MIN)), + } + } +} + +impl MaxGaugeMetric { + pub fn from_value(bytes: usize) -> Self { + Self { + value: Arc::new(AtomicUsize::new(bytes)), + } + } + + pub fn value(&self) -> usize { + self.value.load(Relaxed) + } + + pub fn set_max(&self, n: usize) { + self.value.fetch_max(n, Relaxed); + } +} + +impl Display for MaxGaugeMetric { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.value()) + } +} + +impl CustomMetricValue for MaxGaugeMetric { + fn new_empty(&self) -> Arc { + Arc::new(MaxGaugeMetric::default()) + } + + fn aggregate(&self, other: Arc) { + let Some(other) = other.as_any().downcast_ref::() else { + return; + }; + self.value.fetch_max(other.value.load(Relaxed), Relaxed); + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_usize(&self) -> usize { + self.value() + } + + fn is_eq(&self, other: &Arc) -> bool { + let Some(other) = other.as_any().downcast_ref::() else { + return false; + }; + other.value() == self.value() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_is_zero_and_set_max_updates() { + let m = MaxGaugeMetric::default(); + assert_eq!(m.value(), 0); + m.set_max(1024); + assert_eq!(m.value(), 1024); + // Lower value should not decrease the gauge + m.set_max(512); + assert_eq!(m.value(), 1024); + // Higher value should increase it + m.set_max(2048); + assert_eq!(m.value(), 2048); + } + + #[test] + fn from_value_constructs_correctly() { + let m = MaxGaugeMetric::from_value(1_000_000); + assert_eq!(m.value(), 1_000_000); + } + + #[test] + fn aggregate_takes_max() { + let a = MaxGaugeMetric::from_value(500); + let b = MaxGaugeMetric::from_value(300); + a.aggregate(Arc::new(b)); + assert_eq!(a.value(), 500); + + let a = MaxGaugeMetric::from_value(300); + let b = MaxGaugeMetric::from_value(500); + a.aggregate(Arc::new(b)); + assert_eq!(a.value(), 500); + } +} diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index bce229f0..6e88c231 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -1,5 +1,6 @@ mod bytes_metric; mod latency_metric; +mod max_gauge_metric; pub(crate) mod proto; mod task_metrics_collector; mod task_metrics_rewriter; @@ -9,6 +10,7 @@ pub use latency_metric::{ AvgLatencyMetric, FirstLatencyMetric, LatencyMetricExt, MaxLatencyMetric, MinLatencyMetric, P50LatencyMetric, P75LatencyMetric, P95LatencyMetric, P99LatencyMetric, }; +pub use max_gauge_metric::{GaugeMetricExt, MaxGaugeMetric}; pub(crate) use task_metrics_collector::collect_plan_metrics; pub use task_metrics_rewriter::{DistributedMetricsFormat, rewrite_distributed_plan_with_metrics}; /// Label used to annotate metrics in execution plan nodes with the task in which they were executed. diff --git a/src/metrics/proto.rs b/src/metrics/proto.rs index da8d5b1b..c28ec03b 100644 --- a/src/metrics/proto.rs +++ b/src/metrics/proto.rs @@ -13,6 +13,7 @@ use super::latency_metric::{ AvgLatencyMetric, FirstLatencyMetric, MaxLatencyMetric, MinLatencyMetric, P50LatencyMetric, P75LatencyMetric, P95LatencyMetric, P99LatencyMetric, }; +use crate::MaxGaugeMetric; use crate::worker::generated::worker as pb; /// df_metrics_set_to_proto converts a [datafusion::physical_plan::metrics::MetricsSet] to a [pb::MetricsSet]. @@ -239,6 +240,15 @@ pub fn df_metric_to_proto(metric: Arc) -> Result() { + Ok(pb::Metric { + value: Some(pb::metric::Value::MaxGauge(pb::MaxGauge { + name: name.to_string(), + value: max_gauge.value() as u64, + })), + partition, + labels, + }) } else { internal_err!("{}", CUSTOM_METRICS_NOT_SUPPORTED) } @@ -554,6 +564,17 @@ pub fn metric_proto_to_df(metric: pb::Metric) -> Result, DataFusionE labels, ))) } + Some(pb::metric::Value::MaxGauge(gauge)) => { + let value = MaxGaugeMetric::from_value(gauge.value as usize); + Ok(Arc::new(Metric::new_with_labels( + MetricValue::Custom { + name: Cow::Owned(gauge.name), + value: Arc::new(value), + }, + partition, + labels, + ))) + } None => internal_err!("proto metric is missing the metric field"), } } diff --git a/src/worker/generated/worker.rs b/src/worker/generated/worker.rs index 728fa2d2..74c906d1 100644 --- a/src/worker/generated/worker.rs +++ b/src/worker/generated/worker.rs @@ -159,7 +159,7 @@ pub struct Metric { pub partition: ::core::option::Option, #[prost( oneof = "metric::Value", - tags = "10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33" + tags = "10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34" )] pub value: ::core::option::Option, } @@ -215,6 +215,8 @@ pub mod metric { CustomP95Latency(super::PercentileLatency), #[prost(message, tag = "33")] CustomP99Latency(super::PercentileLatency), + #[prost(message, tag = "34")] + MaxGauge(super::MaxGauge), } } /// A MetricsSet is a protobuf mirror of datafusion::physical_plan::metrics::MetricsSet. It represents @@ -357,6 +359,13 @@ pub struct PercentileLatency { #[prost(bytes = "vec", tag = "4")] pub sketch_bytes: ::prost::alloc::vec::Vec, } +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct MaxGauge { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(uint64, tag = "2")] + pub value: u64, +} /// Generated client implementations. pub mod worker_service_client { #![allow( diff --git a/src/worker/worker.proto b/src/worker/worker.proto index 81217578..7aaeebc1 100644 --- a/src/worker/worker.proto +++ b/src/worker/worker.proto @@ -180,6 +180,7 @@ message Metric { PercentileLatency custom_p75_latency = 31; PercentileLatency custom_p95_latency = 32; PercentileLatency custom_p99_latency = 33; + MaxGauge max_gauge = 34; } } @@ -286,3 +287,8 @@ message PercentileLatency { string name = 1; bytes sketch_bytes = 4; } + +message MaxGauge { + string name = 1; + uint64 value = 2; +} \ No newline at end of file