Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod on_drop_stream;
mod once_lock;
mod recursion;
mod task_context_helpers;
mod time;
mod uuid;

pub(crate) use children_helpers::require_one_child;
Expand All @@ -12,4 +13,5 @@ pub(crate) use on_drop_stream::on_drop_stream;
pub(crate) use once_lock::OnceLockResult;
pub(crate) use recursion::TreeNodeExt;
pub(crate) use task_context_helpers::task_ctx_with_extension;
pub(crate) use time::now_nanos;
pub(crate) use uuid::{deserialize_uuid, serialize_uuid};
9 changes: 9 additions & 0 deletions src/common/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use std::time::{SystemTime, UNIX_EPOCH};

/// Returns the number of nanoseconds since Unix epoch.
pub(crate) fn now_nanos() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_nanos() as u64)
.unwrap_or(0)
}
14 changes: 5 additions & 9 deletions src/coordinator/task_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::coordinator::MetricsStore;
use crate::passthrough_headers::get_passthrough_headers;
use crate::protobuf::tonic_status_to_datafusion_error;
use crate::stage::LocalStage;
use crate::work_unit_feed::{build_work_unit_msg, set_work_unit_send_time};
use crate::worker::generated::worker as pb;
use crate::worker::generated::worker::coordinator_to_worker_msg::Inner;
use crate::worker::generated::worker::set_plan_request::WorkUnitFeedDeclaration;
Expand Down Expand Up @@ -166,8 +167,9 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> {
let request = Request::from_parts(
MetadataMap::from_headers(headers),
Extensions::default(),
futures::stream::once(async { msg })
.chain(UnboundedReceiverStream::new(coordinator_to_worker_rx)),
futures::stream::once(async { msg }).chain(
UnboundedReceiverStream::new(coordinator_to_worker_rx).map(set_work_unit_send_time),
),
);

let metrics = self.metrics.clone();
Expand Down Expand Up @@ -275,13 +277,7 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> {
// so they must be encoded in order to send them over the wire.
while let Some(data_or_err) = work_unit_feed.next().await {
if tx
.send(pb::CoordinatorToWorkerMsg {
inner: Some(Inner::WorkUnit(pb::WorkUnit {
id: serialize_uuid(&id),
partition: partition as u64,
body: data_or_err?.encode_to_bytes(),
})),
})
.send(build_work_unit_msg(&id, partition, data_or_err?))
.is_err()
{
break; // channel closed.
Expand Down
30 changes: 25 additions & 5 deletions src/metrics/latency_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ impl MinLatencyMetric {

pub fn add_duration(&self, duration: Duration) {
let more_nanos = duration.as_nanos() as usize;
self.nanos.fetch_min(more_nanos.max(1), Relaxed);
self.add_nanos(more_nanos);
}

pub fn add_nanos(&self, nanos: usize) {
self.nanos.fetch_min(nanos.max(1), Relaxed);
}
}

Expand Down Expand Up @@ -191,7 +195,11 @@ impl MaxLatencyMetric {

pub fn add_duration(&self, duration: Duration) {
let more_nanos = duration.as_nanos() as usize;
self.nanos.fetch_max(more_nanos.max(1), Relaxed);
self.add_nanos(more_nanos);
}

pub fn add_nanos(&self, nanos: usize) {
self.nanos.fetch_max(nanos.max(1), Relaxed);
}
}

Expand Down Expand Up @@ -261,7 +269,11 @@ impl AvgLatencyMetric {

pub fn add_duration(&self, duration: Duration) {
let more_nanos = duration.as_nanos() as usize;
self.nanos_sum.fetch_add(more_nanos.max(1), Relaxed);
self.add_nanos(more_nanos);
}

pub fn add_nanos(&self, nanos: usize) {
self.nanos_sum.fetch_add(nanos.max(1), Relaxed);
self.count.fetch_add(1, Relaxed);
}
}
Expand Down Expand Up @@ -326,6 +338,10 @@ impl FirstLatencyMetric {

pub fn add_duration(&self, duration: Duration) {
let nanos = duration.as_nanos() as usize;
self.add_nanos(nanos);
}

pub fn add_nanos(&self, nanos: usize) {
// compare_exchange: only set if still at the sentinel value (0).
let _ = self
.nanos
Expand Down Expand Up @@ -424,8 +440,12 @@ macro_rules! percentile_latency_metric {
}

pub fn add_duration(&self, duration: Duration) {
let nanos = (duration.as_nanos() as usize).max(1) as f64;
self.inner.lock().unwrap().add(nanos);
let nanos = (duration.as_nanos() as usize).max(1);
self.add_nanos(nanos);
}

pub fn add_nanos(&self, nanos: usize) {
self.inner.lock().unwrap().add(nanos as f64);
}
}

Expand Down
11 changes: 2 additions & 9 deletions src/test_utils/work_unit_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,11 @@ pub struct FileScanWorkUnitProvider {
/// `None` slots are sent as empty work units so the worker emits an empty
/// stream for that partition.
file_groups: Vec<FileGroup>,
metrics: ExecutionPlanMetricsSet,
}

impl FileScanWorkUnitProvider {
pub fn new(file_groups: Vec<FileGroup>) -> Self {
Self {
file_groups,
metrics: ExecutionPlanMetricsSet::new(),
}
Self { file_groups }
}
}

Expand Down Expand Up @@ -208,10 +204,7 @@ impl DataSource for WorkUnitFileScanConfig {
}

fn metrics(&self) -> ExecutionPlanMetricsSet {
self.feed
.inner()
.map(|p| p.metrics.clone())
.unwrap_or_default()
self.feed.metrics()
}

fn try_swapping_with_projection(
Expand Down
5 changes: 4 additions & 1 deletion src/work_unit_feed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ mod work_unit_feed;
mod work_unit_feed_provider;
mod work_unit_feed_registry;

pub(crate) use remote_work_unit_feed::RemoteWorkUnitFeedRegistry;
pub(crate) use remote_work_unit_feed::{
RemoteWorkUnitFeedRegistry, build_work_unit_msg, set_work_unit_received_time,
set_work_unit_send_time,
};
pub(crate) use work_unit_feed_registry::{WorkUnitFeedRegistry, set_distributed_work_unit_feed};

pub use work_unit::WorkUnit;
Expand Down
121 changes: 110 additions & 11 deletions src/work_unit_feed/remote_work_unit_feed.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use crate::WorkUnitFeedProvider;
use datafusion::common::{HashMap, exec_err};
use crate::common::{now_nanos, serialize_uuid};
use crate::worker::generated::worker as pb;
use crate::{BytesMetricExt, LatencyMetricExt, WorkUnit};
use datafusion::common::{HashMap, Result, exec_err};
use datafusion::execution::TaskContext;
use datafusion::physical_expr_common::metrics::MetricBuilder;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_proto::protobuf::proto_error;
use futures::StreamExt;
use futures::stream::BoxStream;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;

pub(crate) type WorkUnitTx = UnboundedSender<datafusion::common::Result<Vec<u8>>>;
pub(crate) type WorkUnitRx = UnboundedReceiver<datafusion::common::Result<Vec<u8>>>;
pub(crate) type WorkUnitTx = UnboundedSender<Result<pb::WorkUnit>>;
pub(crate) type WorkUnitRx = UnboundedReceiver<Result<pb::WorkUnit>>;
pub(crate) type RemoteWorkUnitFeedRxs = HashMap<(Uuid, usize), Mutex<Option<WorkUnitRx>>>;
pub(crate) type RemoteWorkUnitFeedTxs = HashMap<(Uuid, usize), WorkUnitTx>;

Expand Down Expand Up @@ -42,6 +47,50 @@ impl RemoteWorkUnitFeedRegistry {
}
}

pub(crate) fn build_work_unit_msg(
id: &Uuid,
partition: usize,
work_unit: Box<dyn WorkUnit>,
) -> pb::CoordinatorToWorkerMsg {
pb::CoordinatorToWorkerMsg {
inner: Some(pb::coordinator_to_worker_msg::Inner::WorkUnit(
pb::WorkUnit {
id: serialize_uuid(id),
partition: partition as u64,
body: work_unit.encode_to_bytes(),
created_timestamp_unix_nanos: now_nanos(),
sent_timestamp_unix_nanos: 0,
received_timestamp_unix_nanos: 0,
processed_timestamp_unix_nanos: 0,
},
)),
}
}

pub(crate) fn set_work_unit_send_time(
mut msg: pb::CoordinatorToWorkerMsg,
) -> pb::CoordinatorToWorkerMsg {
if let pb::CoordinatorToWorkerMsg {
inner: Some(pb::coordinator_to_worker_msg::Inner::WorkUnit(work_unit)),
} = &mut msg
{
work_unit.sent_timestamp_unix_nanos = now_nanos();
}
msg
}

pub(crate) fn set_work_unit_received_time(
mut msg: pb::CoordinatorToWorkerMsg,
) -> pb::CoordinatorToWorkerMsg {
if let pb::CoordinatorToWorkerMsg {
inner: Some(pb::coordinator_to_worker_msg::Inner::WorkUnit(work_unit)),
} = &mut msg
{
work_unit.received_timestamp_unix_nanos = now_nanos();
}
msg
}

/// Remove implementation of a [WorkUnitFeedProvider] that pulls [crate::WorkUnit]s coming over
/// the wire from a [RemoteWorkUnitFeedRegistry].
///
Expand All @@ -53,17 +102,31 @@ impl RemoteWorkUnitFeedRegistry {
#[derive(Debug, Clone)]
pub(crate) struct RemoteFeedProvider {
pub(crate) id: Uuid,
pub(crate) metrics: ExecutionPlanMetricsSet,
}

impl WorkUnitFeedProvider for RemoteFeedProvider {
type WorkUnit = Vec<u8>;

fn feed(
impl RemoteFeedProvider {
pub(crate) fn feed<T: WorkUnit + Default>(
&self,
partition: usize,
ctx: Arc<TaskContext>,
) -> datafusion::common::Result<BoxStream<'static, datafusion::common::Result<Self::WorkUnit>>>
{
) -> Result<BoxStream<'static, Result<T>>> {
let bdr = || MetricBuilder::new(&self.metrics);

let bytes_transferred = bdr().bytes_counter("work_unit_bytes");
let msg_count = bdr().global_counter("work_unit_count");
// Track end-to-end network latency distribution for all work units.
let send_latency_max = bdr().max_latency("work_unit_send_latency_max");
let send_latency_p50 = bdr().p50_latency("work_unit_send_latency_p50");

let received_latency_max = bdr().max_latency("work_unit_received_latency_max");
let received_latency_p50 = bdr().p50_latency("work_unit_received_latency_p50");

let processed_latency_max = bdr().max_latency("work_unit_processed_latency_max");
let processed_latency_p50 = bdr().p50_latency("work_unit_processed_latency_p50");

let elapsed_compute = bdr().elapsed_compute(partition);

let Some(rxs) = ctx
.session_config()
.get_extension::<RemoteWorkUnitFeedRxs>()
Expand All @@ -84,6 +147,42 @@ impl WorkUnitFeedProvider for RemoteFeedProvider {
);
};

Ok(UnboundedReceiverStream::new(receiver).boxed())
Ok(UnboundedReceiverStream::new(receiver)
.map(move |work_unit_or_err| {
let mut work_unit = work_unit_or_err?;
let timer = elapsed_compute.timer();
let result = T::decode(work_unit.body.as_slice())
.map_err(|err| proto_error(format!("{err}")));
timer.done();
work_unit.processed_timestamp_unix_nanos = now_nanos();

let pb::WorkUnit {
created_timestamp_unix_nanos: base,
sent_timestamp_unix_nanos,
received_timestamp_unix_nanos,
processed_timestamp_unix_nanos,
body,
..
} = work_unit;

bytes_transferred.add_bytes(body.len());
msg_count.add(1);

send_latency_max.add_nanos((sent_timestamp_unix_nanos - base) as usize);
send_latency_p50.add_nanos((sent_timestamp_unix_nanos - base) as usize);

received_latency_max.add_nanos((received_timestamp_unix_nanos - base) as usize);
received_latency_p50.add_nanos((received_timestamp_unix_nanos - base) as usize);

processed_latency_max.add_nanos((processed_timestamp_unix_nanos - base) as usize);
processed_latency_p50.add_nanos((processed_timestamp_unix_nanos - base) as usize);

result
})
.boxed())
}

pub(crate) fn metrics(&self) -> ExecutionPlanMetricsSet {
self.metrics.clone()
}
}
30 changes: 18 additions & 12 deletions src/work_unit_feed/work_unit_feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ use crate::common::{deserialize_uuid, serialize_uuid};
use crate::work_unit_feed::remote_work_unit_feed::RemoteFeedProvider;
use datafusion::common::{Result, internal_err};
use datafusion::execution::TaskContext;
use datafusion_proto::protobuf::proto_error;
use futures::StreamExt;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::stream::BoxStream;
use prost::Message;
use std::fmt::Debug;
use std::sync::Arc;
use uuid::Uuid;
Expand Down Expand Up @@ -143,14 +141,14 @@ impl<T: WorkUnitFeedProvider> RemoteOrLocalProvider<T> {
) -> Result<BoxStream<'static, Result<T::WorkUnit>>> {
match self {
Self::Local(local) => local.feed(partition, ctx),
Self::Remote(remote) => Ok(remote
.feed(partition, ctx)?
.map(|bytes_or_err| {
let bytes = bytes_or_err?;
T::WorkUnit::decode(bytes.as_slice())
.map_err(|err| proto_error(format!("{err}")))
})
.boxed()),
Self::Remote(remote) => Ok(remote.feed::<T::WorkUnit>(partition, ctx)?),
}
}

fn metrics(&self) -> ExecutionPlanMetricsSet {
match self {
Self::Local(local) => local.metrics(),
Self::Remote(remote) => remote.metrics(),
}
}
}
Expand Down Expand Up @@ -181,7 +179,10 @@ impl<T: WorkUnitFeedProvider> WorkUnitFeed<T> {
let id = deserialize_uuid(&proto.id)?;
Ok(WorkUnitFeed {
id,
provider: RemoteOrLocalProvider::Remote(RemoteFeedProvider { id }),
provider: RemoteOrLocalProvider::Remote(RemoteFeedProvider {
id,
metrics: ExecutionPlanMetricsSet::new(),
}),
})
}

Expand Down Expand Up @@ -273,4 +274,9 @@ impl<T: WorkUnitFeedProvider> WorkUnitFeed<T> {
) -> Result<BoxStream<'static, Result<T::WorkUnit>>> {
self.provider.feed(partition, ctx)
}

/// DataFusion metrics collected at runtime while streaming [WorkUnit]s through [Self::feed].
pub fn metrics(&self) -> ExecutionPlanMetricsSet {
self.provider.metrics()
}
}
6 changes: 6 additions & 0 deletions src/work_unit_feed/work_unit_feed_provider.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::WorkUnit;
use datafusion::common::Result;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::stream::BoxStream;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -81,6 +82,11 @@ pub trait WorkUnitFeedProvider: Send + Sync + Debug {
partition: usize,
ctx: Arc<TaskContext>,
) -> Result<BoxStream<'static, Result<Self::WorkUnit>>>;

/// DataFusion metrics collected at runtime while streaming [WorkUnit]s through [Self::feed].
fn metrics(&self) -> ExecutionPlanMetricsSet {
ExecutionPlanMetricsSet::new()
}
}

/// Provides contextual information about where a [WorkUnitFeedProvider] is being executed. When
Expand Down
Loading