From dfb7b07ad684cc2e5cf462fca3d9e2fe9aa99852 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Sat, 9 May 2026 18:34:34 +0200 Subject: [PATCH] Add metrics to WorkUnitFeed --- src/common/mod.rs | 2 + src/common/time.rs | 9 ++ src/coordinator/task_spawner.rs | 14 +- src/metrics/latency_metric.rs | 30 ++++- src/test_utils/work_unit_file_scan.rs | 11 +- src/work_unit_feed/mod.rs | 5 +- src/work_unit_feed/remote_work_unit_feed.rs | 121 ++++++++++++++++-- src/work_unit_feed/work_unit_feed.rs | 30 +++-- src/work_unit_feed/work_unit_feed_provider.rs | 6 + src/worker/generated/worker.rs | 12 ++ src/worker/impl_coordinator_channel.rs | 7 +- src/worker/impl_execute_task.rs | 9 +- src/worker/worker.proto | 8 ++ src/worker/worker_service.rs | 4 +- 14 files changed, 210 insertions(+), 58 deletions(-) create mode 100644 src/common/time.rs diff --git a/src/common/mod.rs b/src/common/mod.rs index de4e60e5..1706261c 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -4,6 +4,7 @@ mod on_drop_stream; mod once_lock; mod recursion; mod task_context_helpers; +mod time; mod uuid; pub(crate) use children_helpers::require_one_child; @@ -12,4 +13,5 @@ pub(crate) use on_drop_stream::on_drop_stream; pub(crate) use once_lock::OnceLockResult; pub(crate) use recursion::TreeNodeExt; pub(crate) use task_context_helpers::task_ctx_with_extension; +pub(crate) use time::now_nanos; pub(crate) use uuid::{deserialize_uuid, serialize_uuid}; diff --git a/src/common/time.rs b/src/common/time.rs new file mode 100644 index 00000000..e7bc1455 --- /dev/null +++ b/src/common/time.rs @@ -0,0 +1,9 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Returns the number of nanoseconds since Unix epoch. +pub(crate) fn now_nanos() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_nanos() as u64) + .unwrap_or(0) +} diff --git a/src/coordinator/task_spawner.rs b/src/coordinator/task_spawner.rs index c959a75b..83057278 100644 --- a/src/coordinator/task_spawner.rs +++ b/src/coordinator/task_spawner.rs @@ -4,6 +4,7 @@ use crate::coordinator::MetricsStore; use crate::passthrough_headers::get_passthrough_headers; use crate::protobuf::tonic_status_to_datafusion_error; use crate::stage::LocalStage; +use crate::work_unit_feed::{build_work_unit_msg, set_work_unit_send_time}; use crate::worker::generated::worker as pb; use crate::worker::generated::worker::coordinator_to_worker_msg::Inner; use crate::worker::generated::worker::set_plan_request::WorkUnitFeedDeclaration; @@ -166,8 +167,9 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { let request = Request::from_parts( MetadataMap::from_headers(headers), Extensions::default(), - futures::stream::once(async { msg }) - .chain(UnboundedReceiverStream::new(coordinator_to_worker_rx)), + futures::stream::once(async { msg }).chain( + UnboundedReceiverStream::new(coordinator_to_worker_rx).map(set_work_unit_send_time), + ), ); let metrics = self.metrics.clone(); @@ -275,13 +277,7 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { // so they must be encoded in order to send them over the wire. while let Some(data_or_err) = work_unit_feed.next().await { if tx - .send(pb::CoordinatorToWorkerMsg { - inner: Some(Inner::WorkUnit(pb::WorkUnit { - id: serialize_uuid(&id), - partition: partition as u64, - body: data_or_err?.encode_to_bytes(), - })), - }) + .send(build_work_unit_msg(&id, partition, data_or_err?)) .is_err() { break; // channel closed. diff --git a/src/metrics/latency_metric.rs b/src/metrics/latency_metric.rs index 92313d52..b01f7adb 100644 --- a/src/metrics/latency_metric.rs +++ b/src/metrics/latency_metric.rs @@ -128,7 +128,11 @@ impl MinLatencyMetric { pub fn add_duration(&self, duration: Duration) { let more_nanos = duration.as_nanos() as usize; - self.nanos.fetch_min(more_nanos.max(1), Relaxed); + self.add_nanos(more_nanos); + } + + pub fn add_nanos(&self, nanos: usize) { + self.nanos.fetch_min(nanos.max(1), Relaxed); } } @@ -191,7 +195,11 @@ impl MaxLatencyMetric { pub fn add_duration(&self, duration: Duration) { let more_nanos = duration.as_nanos() as usize; - self.nanos.fetch_max(more_nanos.max(1), Relaxed); + self.add_nanos(more_nanos); + } + + pub fn add_nanos(&self, nanos: usize) { + self.nanos.fetch_max(nanos.max(1), Relaxed); } } @@ -261,7 +269,11 @@ impl AvgLatencyMetric { pub fn add_duration(&self, duration: Duration) { let more_nanos = duration.as_nanos() as usize; - self.nanos_sum.fetch_add(more_nanos.max(1), Relaxed); + self.add_nanos(more_nanos); + } + + pub fn add_nanos(&self, nanos: usize) { + self.nanos_sum.fetch_add(nanos.max(1), Relaxed); self.count.fetch_add(1, Relaxed); } } @@ -326,6 +338,10 @@ impl FirstLatencyMetric { pub fn add_duration(&self, duration: Duration) { let nanos = duration.as_nanos() as usize; + self.add_nanos(nanos); + } + + pub fn add_nanos(&self, nanos: usize) { // compare_exchange: only set if still at the sentinel value (0). let _ = self .nanos @@ -424,8 +440,12 @@ macro_rules! percentile_latency_metric { } pub fn add_duration(&self, duration: Duration) { - let nanos = (duration.as_nanos() as usize).max(1) as f64; - self.inner.lock().unwrap().add(nanos); + let nanos = (duration.as_nanos() as usize).max(1); + self.add_nanos(nanos); + } + + pub fn add_nanos(&self, nanos: usize) { + self.inner.lock().unwrap().add(nanos as f64); } } diff --git a/src/test_utils/work_unit_file_scan.rs b/src/test_utils/work_unit_file_scan.rs index 52df7476..e7620e69 100644 --- a/src/test_utils/work_unit_file_scan.rs +++ b/src/test_utils/work_unit_file_scan.rs @@ -54,15 +54,11 @@ pub struct FileScanWorkUnitProvider { /// `None` slots are sent as empty work units so the worker emits an empty /// stream for that partition. file_groups: Vec, - metrics: ExecutionPlanMetricsSet, } impl FileScanWorkUnitProvider { pub fn new(file_groups: Vec) -> Self { - Self { - file_groups, - metrics: ExecutionPlanMetricsSet::new(), - } + Self { file_groups } } } @@ -208,10 +204,7 @@ impl DataSource for WorkUnitFileScanConfig { } fn metrics(&self) -> ExecutionPlanMetricsSet { - self.feed - .inner() - .map(|p| p.metrics.clone()) - .unwrap_or_default() + self.feed.metrics() } fn try_swapping_with_projection( diff --git a/src/work_unit_feed/mod.rs b/src/work_unit_feed/mod.rs index fd3b32d5..8842c03f 100644 --- a/src/work_unit_feed/mod.rs +++ b/src/work_unit_feed/mod.rs @@ -5,7 +5,10 @@ mod work_unit_feed; mod work_unit_feed_provider; mod work_unit_feed_registry; -pub(crate) use remote_work_unit_feed::RemoteWorkUnitFeedRegistry; +pub(crate) use remote_work_unit_feed::{ + RemoteWorkUnitFeedRegistry, build_work_unit_msg, set_work_unit_received_time, + set_work_unit_send_time, +}; pub(crate) use work_unit_feed_registry::{WorkUnitFeedRegistry, set_distributed_work_unit_feed}; pub use work_unit::WorkUnit; diff --git a/src/work_unit_feed/remote_work_unit_feed.rs b/src/work_unit_feed/remote_work_unit_feed.rs index 17106f0f..552a195c 100644 --- a/src/work_unit_feed/remote_work_unit_feed.rs +++ b/src/work_unit_feed/remote_work_unit_feed.rs @@ -1,6 +1,11 @@ -use crate::WorkUnitFeedProvider; -use datafusion::common::{HashMap, exec_err}; +use crate::common::{now_nanos, serialize_uuid}; +use crate::worker::generated::worker as pb; +use crate::{BytesMetricExt, LatencyMetricExt, WorkUnit}; +use datafusion::common::{HashMap, Result, exec_err}; use datafusion::execution::TaskContext; +use datafusion::physical_expr_common::metrics::MetricBuilder; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_proto::protobuf::proto_error; use futures::StreamExt; use futures::stream::BoxStream; use std::sync::{Arc, Mutex}; @@ -8,8 +13,8 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; use uuid::Uuid; -pub(crate) type WorkUnitTx = UnboundedSender>>; -pub(crate) type WorkUnitRx = UnboundedReceiver>>; +pub(crate) type WorkUnitTx = UnboundedSender>; +pub(crate) type WorkUnitRx = UnboundedReceiver>; pub(crate) type RemoteWorkUnitFeedRxs = HashMap<(Uuid, usize), Mutex>>; pub(crate) type RemoteWorkUnitFeedTxs = HashMap<(Uuid, usize), WorkUnitTx>; @@ -42,6 +47,50 @@ impl RemoteWorkUnitFeedRegistry { } } +pub(crate) fn build_work_unit_msg( + id: &Uuid, + partition: usize, + work_unit: Box, +) -> pb::CoordinatorToWorkerMsg { + pb::CoordinatorToWorkerMsg { + inner: Some(pb::coordinator_to_worker_msg::Inner::WorkUnit( + pb::WorkUnit { + id: serialize_uuid(id), + partition: partition as u64, + body: work_unit.encode_to_bytes(), + created_timestamp_unix_nanos: now_nanos(), + sent_timestamp_unix_nanos: 0, + received_timestamp_unix_nanos: 0, + processed_timestamp_unix_nanos: 0, + }, + )), + } +} + +pub(crate) fn set_work_unit_send_time( + mut msg: pb::CoordinatorToWorkerMsg, +) -> pb::CoordinatorToWorkerMsg { + if let pb::CoordinatorToWorkerMsg { + inner: Some(pb::coordinator_to_worker_msg::Inner::WorkUnit(work_unit)), + } = &mut msg + { + work_unit.sent_timestamp_unix_nanos = now_nanos(); + } + msg +} + +pub(crate) fn set_work_unit_received_time( + mut msg: pb::CoordinatorToWorkerMsg, +) -> pb::CoordinatorToWorkerMsg { + if let pb::CoordinatorToWorkerMsg { + inner: Some(pb::coordinator_to_worker_msg::Inner::WorkUnit(work_unit)), + } = &mut msg + { + work_unit.received_timestamp_unix_nanos = now_nanos(); + } + msg +} + /// Remove implementation of a [WorkUnitFeedProvider] that pulls [crate::WorkUnit]s coming over /// the wire from a [RemoteWorkUnitFeedRegistry]. /// @@ -53,17 +102,31 @@ impl RemoteWorkUnitFeedRegistry { #[derive(Debug, Clone)] pub(crate) struct RemoteFeedProvider { pub(crate) id: Uuid, + pub(crate) metrics: ExecutionPlanMetricsSet, } -impl WorkUnitFeedProvider for RemoteFeedProvider { - type WorkUnit = Vec; - - fn feed( +impl RemoteFeedProvider { + pub(crate) fn feed( &self, partition: usize, ctx: Arc, - ) -> datafusion::common::Result>> - { + ) -> Result>> { + let bdr = || MetricBuilder::new(&self.metrics); + + let bytes_transferred = bdr().bytes_counter("work_unit_bytes"); + let msg_count = bdr().global_counter("work_unit_count"); + // Track end-to-end network latency distribution for all work units. + let send_latency_max = bdr().max_latency("work_unit_send_latency_max"); + let send_latency_p50 = bdr().p50_latency("work_unit_send_latency_p50"); + + let received_latency_max = bdr().max_latency("work_unit_received_latency_max"); + let received_latency_p50 = bdr().p50_latency("work_unit_received_latency_p50"); + + let processed_latency_max = bdr().max_latency("work_unit_processed_latency_max"); + let processed_latency_p50 = bdr().p50_latency("work_unit_processed_latency_p50"); + + let elapsed_compute = bdr().elapsed_compute(partition); + let Some(rxs) = ctx .session_config() .get_extension::() @@ -84,6 +147,42 @@ impl WorkUnitFeedProvider for RemoteFeedProvider { ); }; - Ok(UnboundedReceiverStream::new(receiver).boxed()) + Ok(UnboundedReceiverStream::new(receiver) + .map(move |work_unit_or_err| { + let mut work_unit = work_unit_or_err?; + let timer = elapsed_compute.timer(); + let result = T::decode(work_unit.body.as_slice()) + .map_err(|err| proto_error(format!("{err}"))); + timer.done(); + work_unit.processed_timestamp_unix_nanos = now_nanos(); + + let pb::WorkUnit { + created_timestamp_unix_nanos: base, + sent_timestamp_unix_nanos, + received_timestamp_unix_nanos, + processed_timestamp_unix_nanos, + body, + .. + } = work_unit; + + bytes_transferred.add_bytes(body.len()); + msg_count.add(1); + + send_latency_max.add_nanos((sent_timestamp_unix_nanos - base) as usize); + send_latency_p50.add_nanos((sent_timestamp_unix_nanos - base) as usize); + + received_latency_max.add_nanos((received_timestamp_unix_nanos - base) as usize); + received_latency_p50.add_nanos((received_timestamp_unix_nanos - base) as usize); + + processed_latency_max.add_nanos((processed_timestamp_unix_nanos - base) as usize); + processed_latency_p50.add_nanos((processed_timestamp_unix_nanos - base) as usize); + + result + }) + .boxed()) + } + + pub(crate) fn metrics(&self) -> ExecutionPlanMetricsSet { + self.metrics.clone() } } diff --git a/src/work_unit_feed/work_unit_feed.rs b/src/work_unit_feed/work_unit_feed.rs index 70aaf079..442cf2a4 100644 --- a/src/work_unit_feed/work_unit_feed.rs +++ b/src/work_unit_feed/work_unit_feed.rs @@ -3,10 +3,8 @@ use crate::common::{deserialize_uuid, serialize_uuid}; use crate::work_unit_feed::remote_work_unit_feed::RemoteFeedProvider; use datafusion::common::{Result, internal_err}; use datafusion::execution::TaskContext; -use datafusion_proto::protobuf::proto_error; -use futures::StreamExt; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use futures::stream::BoxStream; -use prost::Message; use std::fmt::Debug; use std::sync::Arc; use uuid::Uuid; @@ -143,14 +141,14 @@ impl RemoteOrLocalProvider { ) -> Result>> { match self { Self::Local(local) => local.feed(partition, ctx), - Self::Remote(remote) => Ok(remote - .feed(partition, ctx)? - .map(|bytes_or_err| { - let bytes = bytes_or_err?; - T::WorkUnit::decode(bytes.as_slice()) - .map_err(|err| proto_error(format!("{err}"))) - }) - .boxed()), + Self::Remote(remote) => Ok(remote.feed::(partition, ctx)?), + } + } + + fn metrics(&self) -> ExecutionPlanMetricsSet { + match self { + Self::Local(local) => local.metrics(), + Self::Remote(remote) => remote.metrics(), } } } @@ -181,7 +179,10 @@ impl WorkUnitFeed { let id = deserialize_uuid(&proto.id)?; Ok(WorkUnitFeed { id, - provider: RemoteOrLocalProvider::Remote(RemoteFeedProvider { id }), + provider: RemoteOrLocalProvider::Remote(RemoteFeedProvider { + id, + metrics: ExecutionPlanMetricsSet::new(), + }), }) } @@ -273,4 +274,9 @@ impl WorkUnitFeed { ) -> Result>> { self.provider.feed(partition, ctx) } + + /// DataFusion metrics collected at runtime while streaming [WorkUnit]s through [Self::feed]. + pub fn metrics(&self) -> ExecutionPlanMetricsSet { + self.provider.metrics() + } } diff --git a/src/work_unit_feed/work_unit_feed_provider.rs b/src/work_unit_feed/work_unit_feed_provider.rs index 3c935e40..aab04c88 100644 --- a/src/work_unit_feed/work_unit_feed_provider.rs +++ b/src/work_unit_feed/work_unit_feed_provider.rs @@ -1,6 +1,7 @@ use crate::WorkUnit; use datafusion::common::Result; use datafusion::execution::TaskContext; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use futures::stream::BoxStream; use std::fmt::Debug; use std::sync::Arc; @@ -81,6 +82,11 @@ pub trait WorkUnitFeedProvider: Send + Sync + Debug { partition: usize, ctx: Arc, ) -> Result>>; + + /// DataFusion metrics collected at runtime while streaming [WorkUnit]s through [Self::feed]. + fn metrics(&self) -> ExecutionPlanMetricsSet { + ExecutionPlanMetricsSet::new() + } } /// Provides contextual information about where a [WorkUnitFeedProvider] is being executed. When diff --git a/src/worker/generated/worker.rs b/src/worker/generated/worker.rs index 57aa698a..eaebf63f 100644 --- a/src/worker/generated/worker.rs +++ b/src/worker/generated/worker.rs @@ -98,6 +98,18 @@ pub struct WorkUnit { /// Arbitrary user-defined data (e.g., a file address) necessary during execution. #[prost(bytes = "vec", tag = "3")] pub body: ::prost::alloc::vec::Vec, + /// Unix timestamp in nanoseconds at which this message was created. + #[prost(uint64, tag = "4")] + pub created_timestamp_unix_nanos: u64, + /// Unix timestamp in nanoseconds at which this message was sent. + #[prost(uint64, tag = "5")] + pub sent_timestamp_unix_nanos: u64, + /// Unix timestamp in nanoseconds at which this message was received. + #[prost(uint64, tag = "6")] + pub received_timestamp_unix_nanos: u64, + /// Unix timestamp in nanoseconds at which this message was processed. + #[prost(uint64, tag = "7")] + pub processed_timestamp_unix_nanos: u64, } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct ExecuteTaskRequest { diff --git a/src/worker/impl_coordinator_channel.rs b/src/worker/impl_coordinator_channel.rs index dfc5da33..85f21a44 100644 --- a/src/worker/impl_coordinator_channel.rs +++ b/src/worker/impl_coordinator_channel.rs @@ -1,5 +1,5 @@ use crate::common::deserialize_uuid; -use crate::work_unit_feed::RemoteWorkUnitFeedRegistry; +use crate::work_unit_feed::{RemoteWorkUnitFeedRegistry, set_work_unit_received_time}; use crate::worker::LocalWorkerContext; use crate::worker::generated::worker::coordinator_to_worker_msg::Inner; use crate::worker::generated::worker::set_plan_request::WorkUnitFeedDeclaration; @@ -16,7 +16,7 @@ use datafusion::execution::SessionStateBuilder; use datafusion::prelude::SessionConfig; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf::PhysicalPlanNode; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt, TryStreamExt}; use std::sync::Arc; use std::sync::atomic::AtomicUsize; use tokio::sync::oneshot; @@ -121,6 +121,7 @@ impl Worker { let work_unit_senders = remote_work_unit_feed_registry.senders; #[allow(clippy::disallowed_methods)] tokio::spawn(async move { + let mut body = body.map_ok(set_work_unit_received_time); while let Some(Ok(msg)) = body.next().await { let Some(Inner::WorkUnit(msg)) = msg.inner else { continue; @@ -131,7 +132,7 @@ impl Worker { let Some(tx) = work_unit_senders.get(&(id, msg.partition as usize)) else { continue; }; - if tx.send(Ok(msg.body)).is_err() { + if tx.send(Ok(msg)).is_err() { break; // channel closed } } diff --git a/src/worker/impl_execute_task.rs b/src/worker/impl_execute_task.rs index 13e3a4a8..e87c1052 100644 --- a/src/worker/impl_execute_task.rs +++ b/src/worker/impl_execute_task.rs @@ -1,5 +1,5 @@ use crate::DistributedConfig; -use crate::common::{map_last_stream, on_drop_stream}; +use crate::common::{map_last_stream, now_nanos, on_drop_stream}; use crate::metrics::proto::df_metrics_set_to_proto; use crate::protobuf::datafusion_error_to_tonic_status; use crate::worker::generated::worker::{FlightAppMetadata, PreOrderTaskMetrics}; @@ -26,7 +26,7 @@ use prost::Message; use std::sync::Arc; use std::sync::Mutex; use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::Duration; use tokio::sync::oneshot::Sender; use tokio_stream::StreamExt; use tonic::{Request, Response, Status}; @@ -180,10 +180,7 @@ pub(crate) async fn execute_remote_task( // the original per-partition streams in later steps. let flight_data = FlightAppMetadata { partition, - created_timestamp_unix_nanos: SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|duration| duration.as_nanos() as u64) - .unwrap_or(0), + created_timestamp_unix_nanos: now_nanos(), }; msg.map(|v| v.with_app_metadata(flight_data.encode_to_vec())) }); diff --git a/src/worker/worker.proto b/src/worker/worker.proto index 6372f31b..7c00f9b2 100644 --- a/src/worker/worker.proto +++ b/src/worker/worker.proto @@ -79,6 +79,14 @@ message WorkUnit { uint64 partition = 2; // Arbitrary user-defined data (e.g., a file address) necessary during execution. bytes body = 3; + // Unix timestamp in nanoseconds at which this message was created in the coordinator. + uint64 created_timestamp_unix_nanos = 4; + // Unix timestamp in nanoseconds at which this message was sent by the coordinator. + uint64 sent_timestamp_unix_nanos = 5; + // Unix timestamp in nanoseconds at which this message was received by a worker. + uint64 received_timestamp_unix_nanos = 6; + // Unix timestamp in nanoseconds at which this message started being processed. + uint64 processed_timestamp_unix_nanos = 7; } message ExecuteTaskRequest { diff --git a/src/worker/worker_service.rs b/src/worker/worker_service.rs index f081ae7f..0c572588 100644 --- a/src/worker/worker_service.rs +++ b/src/worker/worker_service.rs @@ -1,4 +1,3 @@ -use super::generated::worker::{GetWorkerInfoRequest, GetWorkerInfoResponse}; use crate::worker::WorkerSessionBuilder; use crate::worker::generated::worker::worker_service_server::{WorkerService, WorkerServiceServer}; use crate::worker::generated::worker::{ @@ -8,7 +7,8 @@ use crate::worker::impl_execute_task::execute_remote_task; use crate::worker::single_write_multi_read::SingleWriteMultiRead; use crate::worker::task_data::TaskData; use crate::{ - DefaultSessionBuilder, ObservabilityServiceImpl, ObservabilityServiceServer, WorkerResolver, + DefaultSessionBuilder, GetWorkerInfoRequest, GetWorkerInfoResponse, ObservabilityServiceImpl, + ObservabilityServiceServer, WorkerResolver, }; use arrow_flight::FlightData; use async_trait::async_trait;