diff --git a/benchmarks/cdk/bin/datafusion-bench.ts b/benchmarks/cdk/bin/datafusion-bench.ts index ba5e0b2a..bf36d9b1 100644 --- a/benchmarks/cdk/bin/datafusion-bench.ts +++ b/benchmarks/cdk/bin/datafusion-bench.ts @@ -24,6 +24,8 @@ async function main() { .option('--max-tasks-per-stage ', 'Max tasks per stage', '0') .option('--repartition-file-min-size ', 'repartition_file_min_size DF option', '10485760' /* upstream default */) .option('--target-partitions ', 'target_partitions DF option', '8') + .option('--dynamic ', 'Use the dynamic task count assigner', 'false') + .option('--bytes-per-partition-per-second ', 'Target throughput in bytes per partition per second for the dynamic task count allocator', `${64 * 1024 * 1024}`) .option('--queries ', 'Specific queries to run', undefined) .option('--debug ', 'Print the generated plans to stdout') .option('--warmup ', 'Perform a warmup query before the benchmarks', 'true') @@ -46,6 +48,8 @@ async function main() { const childrenIsolatorUnions = options.childrenIsolatorUnions === 'true' || options.childrenIsolatorUnions === 1 const broadcastJoins = options.broadcastJoins === 'true' || options.broadcastJoins === 1 const partialReduce = options.partialReduce === 'true' || options.partialReduce === 1 + const dynamicTaskCount = options.dynamic === 'true' || options.dynamic === 1 + const bytesPerPartitionPerSecond = parseInt(options.bytesPerPartitionPerSecond) const debug = options.debug === true || options.debug === 'true' || options.debug === 1 const warmup = options.warmup === true || options.warmup === 'true' || options.warmup === 1 @@ -59,6 +63,8 @@ async function main() { compression, broadcastJoins, partialReduce, + dynamicTaskCount, + bytesPerPartitionPerSecond, maxTasksPerStage, repartitionFileMinSize, targetPartitions @@ -98,6 +104,8 @@ class DataFusionRunner implements BenchmarkRunner { childrenIsolatorUnions: boolean; broadcastJoins: boolean; partialReduce: boolean; + dynamicTaskCount: boolean; + bytesPerPartitionPerSecond: number; maxTasksPerStage: number; repartitionFileMinSize: number; targetPartitions: number; @@ -177,6 +185,8 @@ class DataFusionRunner implements BenchmarkRunner { SET distributed.children_isolator_unions=${this.options.childrenIsolatorUnions}; SET distributed.broadcast_joins=${this.options.broadcastJoins}; SET distributed.partial_reduce=${this.options.partialReduce}; + SET distributed.dynamic_task_count=${this.options.dynamicTaskCount}; + SET distributed.bytes_per_partition_per_second=${this.options.bytesPerPartitionPerSecond}; SET distributed.max_tasks_per_stage=${this.options.maxTasksPerStage}; SET datafusion.optimizer.repartition_file_min_size=${this.options.repartitionFileMinSize}; SET datafusion.execution.target_partitions=${this.options.targetPartitions}; diff --git a/benchmarks/src/run.rs b/benchmarks/src/run.rs index 45836f42..aefbbd63 100644 --- a/benchmarks/src/run.rs +++ b/benchmarks/src/run.rs @@ -24,11 +24,13 @@ use datafusion::common::utils::get_available_parallelism; use datafusion::common::{config_err, exec_err, not_impl_err}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::SessionStateBuilder; -use datafusion::physical_plan::display::DisplayableExecutionPlan; -use datafusion::physical_plan::{collect, displayable}; +use datafusion::physical_plan::collect; use datafusion::prelude::*; use datafusion_distributed::test_utils::localhost::LocalHostWorkerResolver; -use datafusion_distributed::{DistributedExt, NetworkBoundaryExt, SessionStateBuilderExt, Worker}; +use datafusion_distributed::{ + DistributedExt, DistributedMetricsFormat, NetworkBoundaryExt, SessionStateBuilderExt, Worker, + display_plan_ascii, rewrite_distributed_plan_with_metrics, +}; use datafusion_distributed_benchmarks::datasets::{clickbench, register_tables, tpcds, tpch}; use std::error::Error; use std::fs; @@ -112,6 +114,14 @@ pub struct RunOpt { #[structopt(short = "s", long = "batch-size")] batch_size: Option, + /// Dynamically assign tasks to stages based on runtime stats + #[structopt(long = "dynamic")] + dynamic: bool, + + /// Amount of bytes per second each partition is expected to handle during dynamic execution + #[structopt(long = "bps")] + bytes_per_partition_per_second: Option, + /// Activate debug mode to see more details #[structopt(short, long)] debug: bool, @@ -171,7 +181,7 @@ impl RunOpt { } async fn run_local(self) -> Result<()> { - let state = SessionStateBuilder::new() + let mut builder = SessionStateBuilder::new() .with_default_features() .with_config(self.config()?) .with_distributed_worker_resolver(LocalHostWorkerResolver::new(self.workers.clone())) @@ -192,8 +202,12 @@ impl RunOpt { .with_distributed_broadcast_joins(self.broadcast_joins)? .with_distributed_metrics_collection(self.collect_metrics)? .with_distributed_max_tasks_per_stage(self.max_tasks_per_stage)? - .build(); - let ctx = SessionContext::new_with_state(state); + .with_distributed_dynamic_task_count(self.dynamic)?; + if let Some(v) = self.bytes_per_partition_per_second { + builder.set_distributed_bytes_per_partition_per_second(v)?; + } + + let ctx = SessionContext::new_with_state(builder.build()); register_tables(&ctx, &self.get_path()?).await?; println!("Running benchmarks with the following options: {self:?}"); @@ -286,21 +300,8 @@ impl RunOpt { let plan = ctx.sql(sql).await?; let (state, plan) = plan.into_parts(); - if self.debug { - println!("=== Logical plan ===\n{plan}\n"); - } - let plan = state.optimize(&plan)?; - if self.debug { - println!("=== Optimized logical plan ===\n{plan}\n"); - } let physical_plan = state.create_physical_plan(&plan).await?; - if self.debug { - println!( - "=== Physical plan ===\n{}\n", - displayable(physical_plan.as_ref()).indent(true) - ); - } let mut n_tasks = 0; physical_plan.clone().transform_down(|node| { if let Some(node) = node.as_network_boundary() { @@ -310,9 +311,14 @@ impl RunOpt { })?; let result = collect(physical_plan.clone(), state.task_ctx()).await?; if self.debug { + let plan = rewrite_distributed_plan_with_metrics( + physical_plan, + DistributedMetricsFormat::Aggregated, + ) + .await?; println!( "=== Physical plan with metrics ===\n{}\n", - DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(true) + display_plan_ascii(plan.as_ref(), true) ); } Ok((result, n_tasks)) diff --git a/src/coordinator/distributed.rs b/src/coordinator/distributed.rs index de8bc1ac..eda7b388 100644 --- a/src/coordinator/distributed.rs +++ b/src/coordinator/distributed.rs @@ -1,5 +1,7 @@ +use crate::DistributedConfig; use crate::common::{require_one_child, serialize_uuid}; use crate::coordinator::metrics_store::MetricsStore; +use crate::coordinator::prepare_dynamic_plan::prepare_dynamic_plan; use crate::coordinator::prepare_static_plan::prepare_static_plan; use crate::distributed_planner::NetworkBoundaryExt; use crate::worker::generated::worker::TaskKey; @@ -27,22 +29,25 @@ use std::sync::Mutex; /// over the wire. #[derive(Debug)] pub struct DistributedExec { - plan: Arc, - prepared_plan: Arc>>>, + base_plan: Arc, + final_plan: Arc>>>, + head_stage: Arc>>>, metrics: ExecutionPlanMetricsSet, pub(crate) metrics_store: Option>, } pub(super) struct PreparedPlan { pub(super) head_stage: Arc, + pub(super) final_plan: Arc, pub(super) join_set: JoinSet>, } impl DistributedExec { - pub fn new(plan: Arc) -> Self { + pub fn new(base_plan: Arc) -> Self { Self { - plan, - prepared_plan: Arc::new(Mutex::new(None)), + base_plan, + final_plan: Arc::new(Mutex::new(None)), + head_stage: Arc::new(Mutex::new(None)), metrics: ExecutionPlanMetricsSet::new(), metrics_store: None, } @@ -69,7 +74,10 @@ impl DistributedExec { let Some(task_metrics) = &self.metrics_store else { return; }; - let _ = self.plan.apply(|plan| { + let Some(plan) = self.final_plan.lock().unwrap().as_ref().cloned() else { + return; + }; + let _ = plan.apply(|plan| { if let Some(boundary) = plan.as_network_boundary() { let stage = boundary.input_stage(); for i in 0..stage.task_count() { @@ -95,7 +103,7 @@ impl DistributedExec { /// It is updated on every call to `execute()`. Returns an error if `.execute()` has not been /// called. pub(crate) fn prepared_plan(&self) -> Result> { - self.prepared_plan + self.final_plan .lock() .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))? .clone() @@ -103,6 +111,18 @@ impl DistributedExec { internal_datafusion_err!("No prepared plan found. Was execute() called?") }) } + + /// Returns the head stage that was actually executed. Unlike [`Self::prepared_plan`] (which is + /// reconstructed for visualization, with `Stage::Local` boundaries and rebuilt ancestor + /// `Arc`s), this returns the original `Arc` instances whose metrics were populated during + /// execution. + pub(crate) fn head_stage(&self) -> Result> { + self.head_stage + .lock() + .map_err(|e| internal_datafusion_err!("Failed to lock head stage: {}", e))? + .clone() + .ok_or_else(|| internal_datafusion_err!("No head stage found. Was execute() called?")) + } } impl DisplayAs for DistributedExec { @@ -121,11 +141,11 @@ impl ExecutionPlan for DistributedExec { } fn properties(&self) -> &Arc { - self.plan.properties() + self.base_plan.properties() } fn children(&self) -> Vec<&Arc> { - vec![&self.plan] + vec![&self.base_plan] } fn with_new_children( @@ -133,8 +153,9 @@ impl ExecutionPlan for DistributedExec { children: Vec>, ) -> Result> { Ok(Arc::new(DistributedExec { - plan: require_one_child(&children)?, - prepared_plan: self.prepared_plan.clone(), + base_plan: require_one_child(&children)?, + final_plan: Arc::new(Mutex::new(None)), + head_stage: Arc::new(Mutex::new(None)), metrics: self.metrics.clone(), metrics_store: self.metrics_store.clone(), })) @@ -155,31 +176,57 @@ impl ExecutionPlan for DistributedExec { ); } - let PreparedPlan { - head_stage, - join_set, - } = prepare_static_plan(&self.plan, &self.metrics, &self.metrics_store, &context)?; - { - let mut guard = self - .prepared_plan - .lock() - .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?; - *guard = Some(head_stage.clone()); - } + let this = Self { + base_plan: Arc::clone(&self.base_plan), + final_plan: Arc::clone(&self.final_plan), + head_stage: Arc::clone(&self.head_stage), + metrics: self.metrics.clone(), + metrics_store: self.metrics_store.clone(), + }; + let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1); let tx = builder.tx(); // Spawn the task that pulls data from child... builder.spawn(async move { + let d_cfg = DistributedConfig::from_config_options(context.session_config().options())?; + + let PreparedPlan { + head_stage, + final_plan, + join_set, + } = match d_cfg.dynamic_task_count { + false => prepare_static_plan( + &this.base_plan, + &this.metrics, + &this.metrics_store, + &context, + )?, + true => { + prepare_dynamic_plan( + &this.base_plan, + &this.metrics, + &this.metrics_store, + &context, + ) + .await? + } + }; + + this.final_plan + .lock() + .expect("poisoned lock") + .replace(final_plan); + this.head_stage + .lock() + .expect("poisoned lock") + .replace(Arc::clone(&head_stage)); let mut stream = head_stage.execute(partition, context)?; while let Some(msg) = stream.next().await { if tx.send(msg).await.is_err() { break; // channel closed } } - Ok(()) - }); - // ...in parallel to the one that feeds the plan to workers. - builder.spawn(async move { + drop(tx); for res in join_set.join_all().await { res?; } diff --git a/src/coordinator/mod.rs b/src/coordinator/mod.rs index 2aea8442..db8c6d01 100644 --- a/src/coordinator/mod.rs +++ b/src/coordinator/mod.rs @@ -1,5 +1,6 @@ mod distributed; mod metrics_store; +mod prepare_dynamic_plan; mod prepare_static_plan; mod task_spawner; diff --git a/src/coordinator/prepare_dynamic_plan.rs b/src/coordinator/prepare_dynamic_plan.rs new file mode 100644 index 00000000..0f1d1575 --- /dev/null +++ b/src/coordinator/prepare_dynamic_plan.rs @@ -0,0 +1,300 @@ +use crate::common::TreeNodeExt; +use crate::coordinator::MetricsStore; +use crate::coordinator::distributed::PreparedPlan; +use crate::coordinator::task_spawner::{ + CoordinatorToWorkerMetrics, CoordinatorToWorkerTaskSpawner, +}; +use crate::distributed_planner::{ + NetworkBoundaryBuilderResult, get_distributed_task_estimator, inject_network_boundaries, + network_boundary_inject_sampler, network_boundary_scale_input, +}; +use crate::stage::{LocalStage, RemoteStage}; +use crate::worker::generated::worker as pb; +use crate::{ + DistributedCodec, DistributedConfig, NetworkBoundary, NetworkBoundaryExt, NetworkCoalesceExec, + Stage, TaskCountAnnotation, TaskRoutingContext, get_distributed_worker_resolver, +}; +use dashmap::DashMap; +use datafusion::common::runtime::JoinSet; +use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion::common::{Result, exec_err}; +use datafusion::config::ConfigOptions; +use datafusion::execution::TaskContext; +use datafusion::physical_expr_common::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::ExecutionPlan; +use futures::{Stream, StreamExt}; +use rand::Rng; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use url::Url; + +pub(super) async fn prepare_dynamic_plan( + base_plan: &Arc, + metrics: &ExecutionPlanMetricsSet, + task_metrics: &Option>, + ctx: &Arc, +) -> Result { + let metrics = CoordinatorToWorkerMetrics::new(metrics); + + let worker_idx = AtomicUsize::new(rand::rng().random_range(0..100)); // TODO + let plans_for_viz = PlanReconstructor::default(); + let outer_join_set = Mutex::new(JoinSet::new()); + + let head_stage = inject_network_boundaries( + Arc::clone(base_plan), + |nb: Arc, cfg: &ConfigOptions| { + let d_cfg = DistributedConfig::from_config_options(cfg)?; + let worker_resolver = get_distributed_worker_resolver(ctx.session_config())?; + let codec = DistributedCodec::new_combined_with_user(ctx.session_config()); + let task_estimator = get_distributed_task_estimator(ctx.session_config())?; + let mut join_set = JoinSet::new(); + let Stage::Local(input_stage) = nb.input_stage() else { + return exec_err!("NetworkBoundary's input stage was in remote mode."); + }; + let mut input_stage = input_stage.clone(); + input_stage.plan = network_boundary_inject_sampler(input_stage.plan)?; + let mut spawner = CoordinatorToWorkerTaskSpawner::new( + &input_stage, + &metrics, + task_metrics, + &codec, + &mut join_set, + )?; + + let urls = worker_resolver.get_urls()?; + let next_url = || urls[(worker_idx.fetch_add(1, SeqCst)) % urls.len()].clone(); + + let routed_urls = match task_estimator.route_tasks(&TaskRoutingContext { + task_ctx: Arc::clone(ctx), + plan: &input_stage.plan, + task_count: input_stage.tasks, + available_urls: &urls, + }) { + Ok(Some(routed_urls)) => routed_urls, + // If the user has not defined custom routing with a `route_tasks` implementation, we + // default to round-robin task assignation from a randomized starting point. + Ok(None) => (0..input_stage.tasks).map(|_| next_url()).collect(), + Err(e) => return exec_err!("error routing tasks to workers: {e}"), + }; + + if routed_urls.len() != input_stage.tasks { + return exec_err!( + "number of tasks ({}) was not equal to number of urls ({}) at execution time", + input_stage.tasks, + routed_urls.len() + ); + } + + let mut workers = Vec::with_capacity(input_stage.tasks); + let mut load_info_rxs = Vec::with_capacity(input_stage.tasks); + + let mut url = if input_stage.tasks == 1 { + get_child_stages_urls(&input_stage.plan)? + .iter() + .find_map(|v| match v.len() == 1 { + true => Some(v.first().cloned()), + false => None, + }) + .flatten() + .unwrap_or_else(next_url) + } else { + next_url() + }; + + for i in 0..input_stage.tasks { + workers.push(url.clone()); + // Spawns the task that feeds this subplan to this worker. There will be as + // many as this spawned tasks as workers. + let (tx, worker_rx) = spawner.send_plan_task(Arc::clone(ctx), i, url)?; + load_info_rxs.push({ + let rx = spawner.load_info_and_metrics_collection_task(i, worker_rx); + // Tag each LoadInfoBatch with the producer task index so + // `calculate_task_count` can identify (task_idx, partition) slices + // independently — `select_all` would otherwise collapse them. + UnboundedReceiverStream::new(rx).map(move |batch| (i, batch)) + }); + spawner.work_unit_feed_task(Arc::clone(ctx), i, tx)?; + url = next_url(); + } + + outer_join_set + .lock() + .expect("poisoned lock") + .spawn(async move { + for result in join_set.join_all().await { + result?; + } + Ok(()) + }); + + plans_for_viz.insert(input_stage.num, Arc::clone(&input_stage.plan)); + + let nb = nb.with_input_stage(Stage::Remote(RemoteStage { + query_id: input_stage.query_id, + num: input_stage.num, + workers, + }))?; + + let load_info_stream = futures::stream::select_all(load_info_rxs); + let partitions_per_task = nb.properties().partitioning.partition_count(); + let partitions_remaining = vec![partitions_per_task; input_stage.tasks]; + let bytes_per_partition_per_second = d_cfg.bytes_per_partition_per_second; + + Ok(async move { + let task_count_above = if nb.as_any().is::() { + TaskCountAnnotation::Maximum(1) + } else { + let bps = total_bytes_per_second(load_info_stream, partitions_remaining).await; + let necessary_partitions = bps.div_ceil(bytes_per_partition_per_second); + let expected_tasks = necessary_partitions.div_ceil(partitions_per_task); + TaskCountAnnotation::Desired(expected_tasks) + }; + Ok(NetworkBoundaryBuilderResult { + task_count_above, + network_boundary: nb, + }) + }) + }, + ctx.session_config().options(), + ) + .await?; + Ok(PreparedPlan { + final_plan: plans_for_viz.reconstruct(&head_stage)?, + head_stage, + join_set: std::mem::take(&mut outer_join_set.lock().unwrap()), + }) +} + +#[derive(Default)] +struct PlanReconstructor { + stage_map: DashMap>, +} + +impl PlanReconstructor { + fn insert(&self, stage: usize, plan: Arc) { + self.stage_map.insert(stage, plan); + } + + fn reconstruct(&self, head_stage: &Arc) -> Result> { + let head_stage = Arc::clone(head_stage); + let reconstructed = head_stage.transform_down_with_task_count(1, |plan, tc| { + let Some(nb) = plan.as_network_boundary() else { + return Ok(Transformed::no(plan)); + }; + let input_stage = nb.input_stage(); + let Some(plan_for_viz) = self.stage_map.get(&input_stage.num()) else { + return exec_err!( + "Failed to retrieve plan for stage {} for visualization purposes", + input_stage.num() + ); + }; + + let plan_for_viz = network_boundary_scale_input( + Arc::clone(&plan_for_viz), + nb.properties().partitioning.partition_count(), + tc, + )?; + + let nb = nb.with_input_stage(Stage::Local(LocalStage { + query_id: input_stage.query_id(), + num: input_stage.num(), + plan: plan_for_viz, + tasks: input_stage.task_count(), + }))?; + + Ok(Transformed::yes(nb)) + })?; + Ok(reconstructed.data) + } +} + +fn get_child_stages_urls( + plan: &Arc, +) -> Result>> { + let mut result = vec![]; + plan.apply(|plan| { + let Some(nb) = plan.as_network_boundary() else { + return Ok(TreeNodeRecursion::Continue); + }; + + match nb.input_stage() { + Stage::Local(_) => exec_err!("While gathering child stages URLs, one was in local mode. This is a bug in the dynamic task count execution logic, please report it.")?, + Stage::Remote(remote) => result.push(&remote.workers) + } + + Ok(TreeNodeRecursion::Jump) + })?; + + Ok(result) +} + +/// Estimates the bytes per second flowing through a stage by reading sample information. +async fn total_bytes_per_second( + mut load_info_stream: impl Stream + Unpin, + mut partitions_remaining: Vec, +) -> usize { + const ANY_SAMPLE_PERCENTAGE: f32 = 0.5; + const BYTES_READY_SAMPLE_PERCENTAGE: f32 = 0.2; + const BYTES_PER_SECOND_SAMPLE_PERCENTAGE: f32 = 0.2; + + fn apply_pct(value: usize, pct: f32) -> usize { + (value as f32 * pct) as usize + } + + let total_partitions = partitions_remaining.iter().sum::(); + let mut partitions_with_bytes_per_second_done = 0; + let mut partitions_with_bytes_ready_done = 0; + let mut partitions_done = 0; + let mut bytes_ready = 0; + let mut bytes_per_second = 0; + + while let Some((task_idx, load_info)) = load_info_stream.next().await { + partitions_remaining[task_idx] -= 1; + bytes_per_second += load_info.bytes_per_second as usize; + bytes_ready += load_info.bytes_ready as usize; + + partitions_with_bytes_per_second_done += (load_info.bytes_per_second > 0) as usize; + partitions_with_bytes_ready_done += (load_info.bytes_ready > 0) as usize; + partitions_done += 1; + + // Short circuit if we collected enough bytes_ready measurements. + if partitions_with_bytes_ready_done + >= apply_pct(total_partitions, BYTES_READY_SAMPLE_PERCENTAGE) + { + break; + } + + // Short circuit if we collected enough bytes_per_second measurements. + if partitions_with_bytes_per_second_done + >= apply_pct(total_partitions, BYTES_PER_SECOND_SAMPLE_PERCENTAGE) + { + break; + } + + // Short circuit early if there's any total reads, regarding of whether they contained + // a bytes read or not. + if partitions_done >= apply_pct(total_partitions, ANY_SAMPLE_PERCENTAGE) { + break; + } + + // Short circuit if there are no further partitions remaining to sample from. + if partitions_remaining.iter().all(|p| *p == 0) { + break; + } + } + + if partitions_done == 0 { + return 0; + } + + bytes_ready *= total_partitions; + bytes_ready /= partitions_done; + + bytes_per_second *= total_partitions; + bytes_per_second /= partitions_done; + + // TODO: it's not really fair to sum this two, as they are different magnitudes + bytes_per_second + bytes_ready +} diff --git a/src/coordinator/prepare_static_plan.rs b/src/coordinator/prepare_static_plan.rs index 3da4c56a..3e73a688 100644 --- a/src/coordinator/prepare_static_plan.rs +++ b/src/coordinator/prepare_static_plan.rs @@ -88,7 +88,7 @@ pub(super) fn prepare_static_plan( // Spawn a task that sends the subplan to the chosen URL. // There will be as many spawned tasks as workers. let (tx, worker_rx) = spawner.send_plan_task(Arc::clone(ctx), i, routed_url)?; - spawner.metrics_collection_task(i, worker_rx); + spawner.load_info_and_metrics_collection_task(i, worker_rx); spawner.work_unit_feed_task(Arc::clone(ctx), i, tx)?; } @@ -102,6 +102,7 @@ pub(super) fn prepare_static_plan( })?; Ok(PreparedPlan { head_stage: prepared.data, + final_plan: Arc::clone(base_plan), join_set, }) } diff --git a/src/coordinator/task_spawner.rs b/src/coordinator/task_spawner.rs index 91ad6bcd..6dfc7f6e 100644 --- a/src/coordinator/task_spawner.rs +++ b/src/coordinator/task_spawner.rs @@ -221,17 +221,19 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { Ok((coordinator_to_worker_tx, worker_to_coordinator_rx)) } - pub(super) fn metrics_collection_task( + pub(super) fn load_info_and_metrics_collection_task( &mut self, task_i: usize, mut worker_to_coordinator_rx: UnboundedReceiver, - ) { + ) -> UnboundedReceiver { let task_key = TaskKey { query_id: serialize_uuid(&self.query_id), stage_id: self.stage_id as u64, task_number: task_i as u64, }; let task_metrics = self.task_metrics.clone(); + let (load_info_tx, load_info_rx) = tokio::sync::mpsc::unbounded_channel(); + let mut load_info_tx_opt = Some(load_info_tx); #[allow(clippy::disallowed_methods)] tokio::spawn(async move { while let Some(msg) = worker_to_coordinator_rx.recv().await { @@ -243,9 +245,18 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { task_metrics.insert(task_key.clone(), pre_order_metrics); } } + pb::worker_to_coordinator_msg::Inner::LoadInfo(load_info) => { + if let Some(tx) = &load_info_tx_opt { + let _ = tx.send(load_info); + } + } + pb::worker_to_coordinator_msg::Inner::LoadInfoEos(_) => { + let _ = load_info_tx_opt.take(); + } } } }); + load_info_rx } /// Launches the task that based on the different local [WorkUnitFeedExec] nodes, sends their diff --git a/src/distributed_ext.rs b/src/distributed_ext.rs index af1ff8ec..a2d08da4 100644 --- a/src/distributed_ext.rs +++ b/src/distributed_ext.rs @@ -582,6 +582,27 @@ pub trait DistributedExt: Sized { P: WorkUnitFeedProvider + 'static, P::WorkUnit: 'static, F: Fn(&T) -> Option<&WorkUnitFeed

> + Send + Sync + 'static; + + /// Dynamically allocates tasks to the different stages based on runtime statistics + /// collected during execution. + fn with_distributed_dynamic_task_count(self, enabled: bool) -> Result; + + /// Same as [DistributedExt::with_distributed_dynamic_task_count] but with an in-place mutation. + fn set_distributed_dynamic_task_count(&mut self, enabled: bool) -> Result<(), DataFusionError>; + + /// Target throughput in bytes per partition per second used by the dynamic task count + /// allocator to decide how many tasks to assign to each stage based on runtime statistics. + fn with_distributed_bytes_per_partition_per_second( + self, + bytes_per_partition_per_second: usize, + ) -> Result; + + /// Same as [DistributedExt::with_distributed_bytes_per_partition_per_second] but with an + /// in-place mutation. + fn set_distributed_bytes_per_partition_per_second( + &mut self, + bytes_per_partition_per_second: usize, + ) -> Result<(), DataFusionError>; } impl DistributedExt for SessionConfig { @@ -730,6 +751,21 @@ impl DistributedExt for SessionConfig { }) } + fn set_distributed_dynamic_task_count(&mut self, enabled: bool) -> Result<(), DataFusionError> { + let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?; + d_cfg.dynamic_task_count = enabled; + Ok(()) + } + + fn set_distributed_bytes_per_partition_per_second( + &mut self, + bytes_per_partition_per_second: usize, + ) -> Result<(), DataFusionError> { + let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?; + d_cfg.bytes_per_partition_per_second = bytes_per_partition_per_second; + Ok(()) + } + delegate! { to self { #[call(set_distributed_option_extension)] @@ -812,6 +848,14 @@ impl DistributedExt for SessionConfig { P: WorkUnitFeedProvider + 'static, P::WorkUnit: 'static, F: Fn(&T) -> Option<&WorkUnitFeed

> + Send + Sync + 'static; + + #[call(set_distributed_dynamic_task_count)] + #[expr($?;Ok(self))] + fn with_distributed_dynamic_task_count(mut self, enabled: bool) -> Result; + + #[call(set_distributed_bytes_per_partition_per_second)] + #[expr($?;Ok(self))] + fn with_distributed_bytes_per_partition_per_second(mut self, bytes_per_partition_per_second: usize) -> Result; } } } @@ -923,6 +967,16 @@ impl DistributedExt for SessionStateBuilder { P: WorkUnitFeedProvider + 'static, P::WorkUnit: 'static, F: Fn(&T) -> Option<&WorkUnitFeed

> + Send + Sync + 'static; + + fn set_distributed_dynamic_task_count(&mut self, enabled: bool) -> Result<(), DataFusionError>; + #[call(set_distributed_dynamic_task_count)] + #[expr($?;Ok(self))] + fn with_distributed_dynamic_task_count(mut self, enabled: bool) -> Result; + + fn set_distributed_bytes_per_partition_per_second(&mut self, bytes_per_partition_per_second: usize) -> Result<(), DataFusionError>; + #[call(set_distributed_bytes_per_partition_per_second)] + #[expr($?;Ok(self))] + fn with_distributed_bytes_per_partition_per_second(mut self, bytes_per_partition_per_second: usize) -> Result; } } } @@ -1034,6 +1088,16 @@ impl DistributedExt for SessionState { P: WorkUnitFeedProvider + 'static, P::WorkUnit: 'static, F: Fn(&T) -> Option<&WorkUnitFeed

> + Send + Sync + 'static; + + fn set_distributed_dynamic_task_count(&mut self, enabled: bool) -> Result<(), DataFusionError>; + #[call(set_distributed_dynamic_task_count)] + #[expr($?;Ok(self))] + fn with_distributed_dynamic_task_count(mut self, enabled: bool) -> Result; + + fn set_distributed_bytes_per_partition_per_second(&mut self, bytes_per_partition_per_second: usize) -> Result<(), DataFusionError>; + #[call(set_distributed_bytes_per_partition_per_second)] + #[expr($?;Ok(self))] + fn with_distributed_bytes_per_partition_per_second(mut self, bytes_per_partition_per_second: usize) -> Result; } } } @@ -1145,6 +1209,16 @@ impl DistributedExt for SessionContext { P: WorkUnitFeedProvider + 'static, P::WorkUnit: 'static, F: Fn(&T) -> Option<&WorkUnitFeed

> + Send + Sync + 'static; + + fn set_distributed_dynamic_task_count(&mut self, enabled: bool) -> Result<(), DataFusionError>; + #[call(set_distributed_dynamic_task_count)] + #[expr($?;Ok(self))] + fn with_distributed_dynamic_task_count(self, enabled: bool) -> Result; + + fn set_distributed_bytes_per_partition_per_second(&mut self, bytes_per_partition_per_second: usize) -> Result<(), DataFusionError>; + #[call(set_distributed_bytes_per_partition_per_second)] + #[expr($?;Ok(self))] + fn with_distributed_bytes_per_partition_per_second(self, bytes_per_partition_per_second: usize) -> Result; } } } diff --git a/src/distributed_planner/distributed_config.rs b/src/distributed_planner/distributed_config.rs index 5b3500bc..d5fc4a18 100644 --- a/src/distributed_planner/distributed_config.rs +++ b/src/distributed_planner/distributed_config.rs @@ -65,6 +65,10 @@ extensions_options! { /// budget will still be admitted (otherwise we would livelock), so the actual peak per /// connection is `worker_connection_buffer_budget_bytes + max_message_size`. pub worker_connection_buffer_budget_bytes: usize, default = 64 * 1024 * 1024 + /// TODO + pub dynamic_task_count: bool, default = false + /// TODO + pub bytes_per_partition_per_second: usize, default = 64 * 1024 * 1024 /// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to /// estimate how many tasks should be spawned for the [Stage] containing the leaf node. pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default() diff --git a/src/distributed_planner/distributed_query_planner.rs b/src/distributed_planner/distributed_query_planner.rs index 2fd92ab7..a0470451 100644 --- a/src/distributed_planner/distributed_query_planner.rs +++ b/src/distributed_planner/distributed_query_planner.rs @@ -95,6 +95,14 @@ impl QueryPlanner for DistributedQueryPlanner { plan = insert_broadcast_execs(plan, cfg)?; + if d_cfg.dynamic_task_count { + // The task count will be decided dynamically at execution time. + return Ok(Arc::new( + DistributedExec::new(plan).with_metrics_collection(d_cfg.collect_metrics), + )); + } + + // Compute per-node task counts and inject `Network*Exec` nodes at the stage boundaries. plan = inject_network_boundaries(plan, CardinalityBasedNetworkBoundaryBuilder, cfg).await?; plan = prepare_network_boundaries(plan)?; diff --git a/src/distributed_planner/mod.rs b/src/distributed_planner/mod.rs index f0d2ad23..7a5de644 100644 --- a/src/distributed_planner/mod.rs +++ b/src/distributed_planner/mod.rs @@ -10,7 +10,11 @@ mod session_state_builder_ext; mod task_estimator; pub use distributed_config::DistributedConfig; +pub(crate) use inject_network_boundaries::{ + NetworkBoundaryBuilderResult, inject_network_boundaries, +}; pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt}; +pub(crate) use network_boundary::{network_boundary_inject_sampler, network_boundary_scale_input}; pub use session_state_builder_ext::SessionStateBuilderExt; pub(crate) use task_estimator::set_distributed_task_estimator; pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator, TaskRoutingContext}; diff --git a/src/distributed_planner/network_boundary.rs b/src/distributed_planner/network_boundary.rs index fec385fc..98d01a41 100644 --- a/src/distributed_planner/network_boundary.rs +++ b/src/distributed_planner/network_boundary.rs @@ -11,7 +11,7 @@ pub trait NetworkBoundary: ExecutionPlan { /// information to perform any internal transformations necessary for distributed execution. /// /// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready". - fn with_input_stage(&self, input_stage: Stage) -> Result>; + fn with_input_stage(&self, input_stage: Stage) -> Result>; /// Returns the assigned input [Stage], if any. fn input_stage(&self) -> &Stage; @@ -76,3 +76,22 @@ pub(crate) fn network_boundary_scale_input( Ok(input) } + +pub(crate) fn network_boundary_inject_sampler( + input: Arc, +) -> Result> { + let transformed = NetworkShuffleExec::inject_sampler(Arc::clone(&input))?; + if transformed.transformed { + return Ok(transformed.data); + } + let transformed = NetworkBroadcastExec::inject_sampler(Arc::clone(&input))?; + if transformed.transformed { + return Ok(transformed.data); + } + let transformed = NetworkCoalesceExec::inject_sampler(Arc::clone(&input))?; + if transformed.transformed { + return Ok(transformed.data); + } + + Ok(input) +} diff --git a/src/execution_plans/mod.rs b/src/execution_plans/mod.rs index a1ea6316..ecdcbc35 100644 --- a/src/execution_plans/mod.rs +++ b/src/execution_plans/mod.rs @@ -6,6 +6,7 @@ mod metrics; mod network_broadcast; mod network_coalesce; mod network_shuffle; +mod sampler; #[cfg(any(test, feature = "integration"))] pub mod benchmarks; @@ -18,3 +19,4 @@ pub(crate) use metrics::MetricsWrapperExec; pub use network_broadcast::NetworkBroadcastExec; pub use network_coalesce::NetworkCoalesceExec; pub use network_shuffle::NetworkShuffleExec; +pub use sampler::SamplerExec; diff --git a/src/execution_plans/network_broadcast.rs b/src/execution_plans/network_broadcast.rs index 64b06e8f..31303bdc 100644 --- a/src/execution_plans/network_broadcast.rs +++ b/src/execution_plans/network_broadcast.rs @@ -1,5 +1,6 @@ use crate::common::require_one_child; use crate::distributed_planner::NetworkBoundary; +use crate::execution_plans::SamplerExec; use crate::stage::{LocalStage, Stage}; use crate::worker::WorkerConnectionPool; use crate::{BroadcastExec, DistributedTaskContext}; @@ -139,6 +140,18 @@ impl NetworkBroadcastExec { )))) } + pub(crate) fn inject_sampler( + plan: Arc, + ) -> Result>> { + let Some(broadcast_exec) = plan.as_any().downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + + let child = require_one_child(broadcast_exec.children())?; + plan.with_new_children(vec![Arc::new(SamplerExec::new(child))]) + .map(Transformed::yes) + } + pub(crate) fn from_stage(input_stage: LocalStage) -> Self { let input_partition_count = input_stage.plan.properties().partitioning.partition_count(); let properties = Arc::new( @@ -173,7 +186,7 @@ impl NetworkBroadcastExec { } impl NetworkBoundary for NetworkBroadcastExec { - fn with_input_stage(&self, input_stage: Stage) -> Result> { + fn with_input_stage(&self, input_stage: Stage) -> Result> { let mut self_clone = self.clone(); self_clone.worker_connections = WorkerConnectionPool::new(input_stage.task_count()); self_clone.input_stage = input_stage; @@ -248,7 +261,8 @@ impl ExecutionPlan for NetworkBroadcastExec { }; let task_context = DistributedTaskContext::from_ctx(&context); - let off = self.properties.partitioning.partition_count() * task_context.task_index; + let out_partitions = self.properties.partitioning.partition_count(); + let off = out_partitions * task_context.task_index; let mut streams = Vec::with_capacity(self.input_stage.task_count()); for input_task_index in 0..self.input_stage.task_count() { @@ -256,6 +270,8 @@ impl ExecutionPlan for NetworkBroadcastExec { remote_stage, off..(off + self.properties.partitioning.partition_count()), input_task_index, + out_partitions, + task_context.task_count, &context, )?; diff --git a/src/execution_plans/network_coalesce.rs b/src/execution_plans/network_coalesce.rs index dc489367..dcb7b65e 100644 --- a/src/execution_plans/network_coalesce.rs +++ b/src/execution_plans/network_coalesce.rs @@ -92,6 +92,14 @@ impl NetworkCoalesceExec { Ok(Transformed::no(plan)) } + /// Does nothing, but it's here for explicitly stating that this network boundary does support + /// input stage sampling. + pub(crate) fn inject_sampler( + plan: Arc, + ) -> Result>> { + Ok(Transformed::no(plan)) + } + pub(crate) fn from_stage(input_stage: LocalStage, consumer_tasks: usize) -> Self { // Each output task coalesces a group of input tasks. We size the output partition count // per output task based on the maximum group size, returning empty streams for tasks with @@ -173,7 +181,7 @@ impl NetworkBoundary for NetworkCoalesceExec { &self.input_stage } - fn with_input_stage(&self, input_stage: Stage) -> Result> { + fn with_input_stage(&self, input_stage: Stage) -> Result> { let mut self_clone = self.clone(); self_clone.properties = scale_partitioning_props(self_clone.properties(), |p| { p * input_stage.task_count() / self_clone.input_stage.task_count().max(1) @@ -249,10 +257,8 @@ impl ExecutionPlan for NetworkCoalesceExec { ); } - let partitions_per_task = self - .properties() - .partitioning - .partition_count() + let out_partitions = self.properties().partitioning.partition_count(); + let partitions_per_task = out_partitions .checked_div( self.input_stage .task_count() @@ -298,6 +304,8 @@ impl ExecutionPlan for NetworkCoalesceExec { remote_stage, 0..partitions_per_task, target_task, + out_partitions, + task_context.task_count, &context, )?; diff --git a/src/execution_plans/network_shuffle.rs b/src/execution_plans/network_shuffle.rs index 00c02138..0b245913 100644 --- a/src/execution_plans/network_shuffle.rs +++ b/src/execution_plans/network_shuffle.rs @@ -1,4 +1,5 @@ use crate::common::require_one_child; +use crate::execution_plans::SamplerExec; use crate::execution_plans::common::scale_partitioning; use crate::stage::{LocalStage, Stage}; use crate::worker::WorkerConnectionPool; @@ -126,6 +127,18 @@ impl NetworkShuffleExec { Ok(Transformed::new(scaled, true, TreeNodeRecursion::Stop)) } + pub(crate) fn inject_sampler( + plan: Arc, + ) -> Result>> { + let Some(repartition_exec) = plan.as_any().downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + + let child = require_one_child(repartition_exec.children())?; + plan.with_new_children(vec![Arc::new(SamplerExec::new(child))]) + .map(Transformed::yes) + } + pub(crate) fn from_stage(input_stage: LocalStage) -> Self { Self { properties: input_stage.plan.properties().clone(), @@ -161,7 +174,7 @@ impl NetworkBoundary for NetworkShuffleExec { &self.input_stage } - fn with_input_stage(&self, input_stage: Stage) -> Result> { + fn with_input_stage(&self, input_stage: Stage) -> Result> { let mut self_clone = self.clone(); self_clone.worker_connections = WorkerConnectionPool::new(input_stage.task_count()); self_clone.input_stage = input_stage; @@ -226,7 +239,8 @@ impl ExecutionPlan for NetworkShuffleExec { }; let task_context = DistributedTaskContext::from_ctx(&context); - let off = self.properties.partitioning.partition_count() * task_context.task_index; + let out_partitions = self.properties.partitioning.partition_count(); + let off = out_partitions * task_context.task_index; let mut streams = Vec::with_capacity(remote_stage.workers.len()); for input_task_index in 0..remote_stage.workers.len() { @@ -234,6 +248,8 @@ impl ExecutionPlan for NetworkShuffleExec { remote_stage, off..(off + self.properties.partitioning.partition_count()), input_task_index, + out_partitions, + task_context.task_count, &context, )?; diff --git a/src/execution_plans/sampler.rs b/src/execution_plans/sampler.rs new file mode 100644 index 00000000..4e4faaa3 --- /dev/null +++ b/src/execution_plans/sampler.rs @@ -0,0 +1,443 @@ +use crate::common::require_one_child; +use crate::worker::generated::worker as pb; +use crate::{ + BytesCounterMetric, BytesMetricExt, GaugeMetricExt, LatencyMetricExt, MaxGaugeMetric, + MaxLatencyMetric, P50LatencyMetric, +}; +use datafusion::arrow::array::Array; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::runtime::SpawnedTask; +use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion::common::{DataFusionError, Result, exec_err}; +use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr_common::metrics::{Gauge, MetricValue, MetricsSet}; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use std::any::Any; +use std::collections::VecDeque; +use std::fmt::{Debug, Formatter}; +use std::pin::Pin; +use std::sync::{Arc, LazyLock, Mutex, OnceLock}; +use std::task::{Context, Poll}; +use std::time::Instant; +use tokio::sync::oneshot; + +/// How many [RecordBatch]s to allow the input stream to yield synchronously (without yielding back +/// to tokio) before short-circuiting buffering. +const READY_CHUNK_LIMIT: usize = 256; +/// Maximum read of bytes per second allowed to be emitted. Reads greater than this will be +/// truncated to this max value, as it's assumed that [READY_CHUNK_LIMIT] was hit and no useful +/// measurement can actually be emitted. +const MAX_BYTES_PER_SECOND: usize = 512 * 1024 * 1024; + +#[derive(Debug)] +pub struct SamplerExec { + pub(crate) input: Arc, + pub(crate) metric_set: ExecutionPlanMetricsSet, + pub(crate) partition_samplers: Vec, +} + +/// Metrics that quantify how long the sampler held data in memory before the consumer +/// (real execution) attached, plus the peak buffer size reached. All metrics are shared +/// across the partition samplers; the latency metrics aggregate per-partition observations. +#[derive(Debug, Clone)] +pub(crate) struct SamplerExecMetrics { + /// Time since [SamplerExec::kick_off_first_sampler] was called until the first batch from + /// the input arrived + kick_off_to_fist_batch_p50: P50LatencyMetric, + kick_off_to_fist_batch_max: MaxLatencyMetric, + /// Time since [SamplerExec::kick_off_first_sampler] was called until the [pb::LoadInfo] message + /// was sent. + kick_off_to_load_info_sent_p50: P50LatencyMetric, + kick_off_to_load_info_sent_max: MaxLatencyMetric, + /// Time since [SamplerExec::kick_off_first_sampler] was called until the first batch from + /// the input arrived + kick_off_to_execution_p50: P50LatencyMetric, + kick_off_to_execution_max: MaxLatencyMetric, + /// Maximum number of record batches buffered by a sampler. + max_batches_buffered: MaxGaugeMetric, + /// Peak memory buffered by any partition sampler during the sampling phase. + max_mem_used: Gauge, + /// Bytes per second flowing through the sampler node. + bytes_per_sec: BytesCounterMetric, + /// Bytes ready at the moment of reporting load info. + bytes_ready: BytesCounterMetric, + /// Elapsed compute while sampling. + elapsed_compute: Time, +} + +impl SamplerExecMetrics { + fn new(metric_set: &ExecutionPlanMetricsSet) -> Self { + let bdr = || MetricBuilder::new(metric_set); + Self { + kick_off_to_fist_batch_p50: bdr().p50_latency("kick_off_to_first_batch_p50"), + kick_off_to_fist_batch_max: bdr().max_latency("kick_off_to_first_batch_max"), + kick_off_to_load_info_sent_p50: bdr().p50_latency("kick_off_to_load_info_sent_p50"), + kick_off_to_load_info_sent_max: bdr().max_latency("kick_off_to_load_info_sent_max"), + kick_off_to_execution_p50: bdr().p50_latency("kick_off_to_execution_p50"), + kick_off_to_execution_max: bdr().max_latency("kick_off_to_execution_max"), + max_batches_buffered: bdr().max_gauge("max_batches_buffered"), + max_mem_used: bdr().global_gauge("max_mem_used"), + bytes_per_sec: bdr().bytes_counter("bytes_per_sec"), + bytes_ready: bdr().bytes_counter("bytes_ready"), + elapsed_compute: { + let time = Time::new(); + bdr().build(MetricValue::ElapsedCompute(time.clone())); + time + }, + } + } +} + +impl SamplerExec { + pub(crate) fn new(input: Arc) -> Self { + let metric_set = ExecutionPlanMetricsSet::new(); + let metric_set_clone = metric_set.clone(); + // Metrics need to be lazily initialized, otherwise the coordinator side will register + // them when they are never relevant there, they are just relevant in workers. + let metrics: Arc SamplerExecMetrics + Send>>> = + Arc::new(LazyLock::new(Box::new(move || { + SamplerExecMetrics::new(&metric_set_clone) + }))); + let partitions = input.properties().partitioning.partition_count(); + let mut samplers = Vec::with_capacity(partitions); + for i in 0..partitions { + samplers.push(PartitionSampler { + partition_idx: i, + input: Arc::clone(&input), + stream: Mutex::new(None), + metrics: Arc::clone(&metrics), + kick_off_at: Arc::new(OnceLock::new()), + first_batch_at: Arc::new(OnceLock::new()), + load_info_sent_at: Arc::new(OnceLock::new()), + }); + } + Self { + input, + metric_set, + partition_samplers: samplers, + } + } + + pub(crate) fn kick_off_first_sampler( + plan: Arc, + ctx: Arc, + ) -> Result>> { + let mut receivers = vec![]; + plan.apply(|plan| { + let Some(sampler) = plan.as_any().downcast_ref::() else { + return Ok(TreeNodeRecursion::Continue); + }; + receivers.reserve(sampler.partition_samplers.len()); + for partition_sampler in &sampler.partition_samplers { + let rx = partition_sampler.kick_off(Arc::clone(&ctx))?; + receivers.push(rx); + } + Ok(TreeNodeRecursion::Stop) + })?; + Ok(receivers) + } +} + +pub(crate) struct PartitionSampler { + partition_idx: usize, + input: Arc, + stream: Mutex>, + + // Metrics state. + metrics: Arc SamplerExecMetrics + Send>>>, + /// Set when `kick_off` is invoked. Used at `execute()` time to record how long the + /// sampler buffered data before the consumer attached. + kick_off_at: Arc>, + /// Set the first time the producer task emits a `LoadInfo`. Used at `execute()` time + /// to record the gap between the first sample and the consumer starting. + first_batch_at: Arc>, + /// Set immediately after `sampling_tx.send()` succeeds. Used to measure the full + /// round-trip: LoadInfo sent → coordinator collects votes → downstream plan dispatched + /// → consumer calls execute(). + load_info_sent_at: Arc>, +} + +impl Debug for PartitionSampler { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PartitionSampler").finish() + } +} + +impl PartitionSampler { + fn start_stream(&self) -> Option { + let Some(kick_off_at) = self.kick_off_at.get() else { + return self.stream.lock().unwrap().take(); + }; + + // Time since this sampler was kicked off until the first batch arrived. + if let Some(t) = self.first_batch_at.get() { + let delay = t.saturating_duration_since(*kick_off_at); + self.metrics.kick_off_to_fist_batch_p50.add_duration(delay); + self.metrics.kick_off_to_fist_batch_max.add_duration(delay); + } + + // Time since the sampler was kicked off until the pb::LoadInfo message was sent. + if let Some(t) = self.load_info_sent_at.get() { + let delay = t.saturating_duration_since(*kick_off_at); + self.metrics + .kick_off_to_load_info_sent_p50 + .add_duration(delay); + self.metrics + .kick_off_to_load_info_sent_max + .add_duration(delay); + } + + // Time since the sampler was kicked off until it started executing. + let delay = kick_off_at.elapsed(); + self.metrics.kick_off_to_execution_p50.add_duration(delay); + self.metrics.kick_off_to_execution_max.add_duration(delay); + + self.stream.lock().unwrap().take() + } + + fn kick_off(&self, ctx: Arc) -> Result> { + let _ = self.kick_off_at.set(Instant::now()); + let (sampling_tx, sampling_rx) = oneshot::channel(); + + let input = Arc::clone(&self.input); + let partition_idx = self.partition_idx; + let schema = input.schema(); + let elapsed_compute = self.metrics.elapsed_compute.clone(); + let first_batch_at = Arc::clone(&self.first_batch_at); + + let mut reporter = LoadInfoDropHandler { + load_info: pb::LoadInfo { + partition: partition_idx as u64, + bytes_per_second: 0, + bytes_ready: 0, + }, + sampling_tx: Some(sampling_tx), + bytes_per_second_metric: self.metrics.bytes_per_sec.clone(), + load_info_sent_at: Arc::clone(&self.load_info_sent_at), + bytes_ready_metric: self.metrics.bytes_ready.clone(), + }; + + let mut buffer = RecordBatchBuffer { + buffer: VecDeque::new(), + max_mem_used: self.metrics.max_mem_used.clone(), + max_batches_buffered: self.metrics.max_batches_buffered.clone(), + memory_reservation: Arc::new( + MemoryConsumer::new(format!("PartitionSampler[{partition_idx}]")) + .register(ctx.memory_pool()), + ), + first_batch_at: Arc::clone(&self.first_batch_at), + }; + + // Execute the input synchronously so any setup error surfaces before we + // spawn the producer task. + let mut input_stream = input.execute(partition_idx, ctx)?.fuse(); + + let task = SpawnedTask::spawn(async move { + // First, read at once all the RecordBatches that are ready to be yielded synchronously. + // Some downstream nodes will accumulate data in-memory, and will then yield several + // RecordBatches at once synchronously (without Poll::Pending gaps in between). + let mut chunked = (&mut input_stream).ready_chunks(READY_CHUNK_LIMIT); + let Some(batches) = chunked.next().await else { + // Not a single RecordBatch was produced, so let bytes_per_second=0 be sent as-is. + return Ok(buffer.chain(input_stream).boxed()); + }; + let _timer = elapsed_compute.timer(); + for batch in batches { + let _ = first_batch_at.set(Instant::now()); + buffer.push(batch?); + } + + // The downstream node yielded too many RecordBatches synchronously, more than what we + // are willing to buffer here, so assume that there's going to be a massive amount of + // data flowing through here. + if buffer.len() >= READY_CHUNK_LIMIT { + reporter.set_bytes_ready(buffer.bytes_ready()); + reporter.set_bytes_per_second(MAX_BYTES_PER_SECOND); + return Ok(buffer.chain(input_stream).boxed()); + } + + // The downstream node finished producing all RecordBatches, there are not more. This + // means that it spent some time accumulating data, and then yielded N RecordBatches + // where N < READY_CHUNK_LIMIT. In this case, no data velocity measurement is reported. + if matches!(input_stream.next().now_or_never(), Some(None)) { + reporter.set_bytes_ready(buffer.bytes_ready()); + return Ok(buffer.chain(input_stream).boxed()); + } + + drop(_timer); + + // Wait for an async gap in order to measure data velocity. + let poll_start = Instant::now(); + let Some(batch) = input_stream.try_next().await? else { + // The last message was somehow the last message in the stream, but the stream did + // not end immediately. This is an unlikely scenario. + reporter.set_bytes_ready(buffer.bytes_ready()); + return Ok(buffer.chain(input_stream).boxed()); + }; + let bytes_per_second = + (record_batch_size(&batch) as f32 / poll_start.elapsed().as_secs_f32()) as usize; + + let _timer = elapsed_compute.timer(); + + buffer.push(batch); + + // Some RecordBatches where buffered, but there's more to be yielded, so both + // bytes_per_second and bytes_ready can be reported. + reporter.set_bytes_ready(buffer.bytes_ready()); + reporter.set_bytes_per_second(bytes_per_second); + + Ok(buffer.chain(input_stream).boxed()) + }); + + let stream = async move { + task.await + .map_err(|err| DataFusionError::Internal(err.to_string()))? + } + .try_flatten_stream(); + + self.stream + .lock() + .expect("poisoned lock") + .replace(Box::pin(RecordBatchStreamAdapter::new(schema, stream))); + + Ok(sampling_rx) + } +} + +struct LoadInfoDropHandler { + load_info: pb::LoadInfo, + bytes_ready_metric: BytesCounterMetric, + bytes_per_second_metric: BytesCounterMetric, + sampling_tx: Option>, + load_info_sent_at: Arc>, +} + +impl LoadInfoDropHandler { + fn set_bytes_ready(&mut self, bytes_ready: usize) { + self.load_info.bytes_ready = bytes_ready as u64; + self.bytes_ready_metric.add_bytes(bytes_ready); + } + + fn set_bytes_per_second(&mut self, bytes_per_second: usize) { + self.load_info.bytes_per_second = bytes_per_second as u64; + self.bytes_per_second_metric.add_bytes(bytes_per_second); + } +} + +impl Drop for LoadInfoDropHandler { + fn drop(&mut self) { + if let Some(sampling_tx) = self.sampling_tx.take() { + let _ = sampling_tx.send(self.load_info); + let _ = self.load_info_sent_at.set(Instant::now()); + } + } +} + +struct RecordBatchBuffer { + buffer: VecDeque<(RecordBatch, usize)>, + max_batches_buffered: MaxGaugeMetric, + max_mem_used: Gauge, + memory_reservation: Arc, + first_batch_at: Arc>, +} + +impl RecordBatchBuffer { + fn push(&mut self, batch: RecordBatch) { + let batch_size = record_batch_size(&batch); + if self.buffer.is_empty() { + let _ = self.first_batch_at.set(Instant::now()); + } + self.max_mem_used.add(batch_size); + self.memory_reservation.grow(batch_size); + self.buffer.push_back((batch, batch_size)); + self.max_batches_buffered.set_max(self.buffer.len()); + } + + fn len(&self) -> usize { + self.buffer.len() + } + + fn bytes_ready(&self) -> usize { + self.buffer.iter().map(|(_, size)| *size).sum() + } +} + +impl Stream for RecordBatchBuffer { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + match self.as_mut().buffer.pop_front() { + None => Poll::Ready(None), + Some((batch, size)) => { + self.memory_reservation.shrink(size); + Poll::Ready(Some(Ok(batch))) + } + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.buffer.len(), Some(self.buffer.len())) + } +} + +fn record_batch_size(batch: &RecordBatch) -> usize { + let mut result = 0; + for c in batch.columns() { + result += c.to_data().get_slice_memory_size().unwrap_or(0) + } + result +} + +impl DisplayAs for SamplerExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "SamplerExec: partitions={}", + self.partition_samplers.len() + ) + } +} + +impl ExecutionPlan for SamplerExec { + fn name(&self) -> &str { + "SamplerExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &Arc { + self.input.properties() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self::new(require_one_child(children)?))) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + let Some(stream) = self.partition_samplers[partition].start_stream() else { + return exec_err!("SamplerExec[{partition}] was not kicked off"); + }; + Ok(stream) + } + + fn metrics(&self) -> Option { + Some(self.metric_set.clone_inner()) + } +} diff --git a/src/metrics/task_metrics_rewriter.rs b/src/metrics/task_metrics_rewriter.rs index 79425c22..f323c884 100644 --- a/src/metrics/task_metrics_rewriter.rs +++ b/src/metrics/task_metrics_rewriter.rs @@ -18,7 +18,6 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::internal_err; use datafusion::physical_plan::metrics::{Label, Metric, MetricsSet}; use std::sync::Arc; -use std::vec; /// Format to use when displaying metrics for a distributed plan. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -51,27 +50,23 @@ pub async fn rewrite_distributed_plan_with_metrics( return Ok(plan); }; - // Check that the plan was executed before waiting — if not, prepared_plan() returns an - // error immediately rather than waiting forever for metrics that will never arrive. - let prepared = distributed_exec.prepared_plan()?; - distributed_exec.wait_for_metrics().await; let Some(metrics_collection) = distributed_exec.metrics_store.clone() else { return Ok(plan); }; - let task_metrics = collect_plan_metrics(&prepared)?; + let head_stage = distributed_exec.head_stage()?; + let task_metrics = collect_plan_metrics(&head_stage)?; // Rewrite the DistributedExec's child plan with metrics. let dist_exec_plan_with_metrics = rewrite_local_plan_with_metrics( format.to_rewrite_ctx(0), // Task id is 0 for the DistributedExec plan - plan.children()[0].clone(), + distributed_exec.prepared_plan()?, task_metrics, )?; - let plan = plan.with_new_children(vec![dist_exec_plan_with_metrics])?; - let transformed = plan.transform_down(|plan| { + let transformed = dist_exec_plan_with_metrics.transform_down(|plan| { // Transform all stages using NetworkShuffleExec and NetworkCoalesceExec as barriers. if let Some(network_boundary) = plan.as_network_boundary() { let Stage::Local(stage) = network_boundary.input_stage() else { @@ -94,7 +89,7 @@ pub async fn rewrite_distributed_plan_with_metrics( Ok(Transformed::no(plan)) })?; - Ok(transformed.data) + plan.with_new_children(vec![transformed.data]) } /// Extra information for rewriting local plans. diff --git a/src/observability/service.rs b/src/observability/service.rs index 06827dca..7b7c5473 100644 --- a/src/observability/service.rs +++ b/src/observability/service.rs @@ -96,7 +96,7 @@ impl ObservabilityService for ObservabilityServiceImpl { let total_partitions = task_data.total_partitions() as u64; let remaining = task_data.num_partitions_remaining() as u64; let completed_partitions = total_partitions.saturating_sub(remaining); - let output_rows = output_rows_from_plan(&task_data.plan); + let output_rows = output_rows_from_plan(&task_data.base_plan); tasks.push(TaskProgress { task_key: Some((*internal_key).clone()), diff --git a/src/protobuf/distributed_codec.rs b/src/protobuf/distributed_codec.rs index c6c03e55..e4496c12 100644 --- a/src/protobuf/distributed_codec.rs +++ b/src/protobuf/distributed_codec.rs @@ -1,9 +1,9 @@ use super::get_distributed_user_codecs; use crate::NetworkShuffleExec; -use crate::common::{deserialize_uuid, serialize_uuid}; +use crate::common::{deserialize_uuid, require_one_child, serialize_uuid}; use crate::execution_plans::{ BroadcastExec, ChildWeight, ChildrenIsolatorUnionExec, NetworkBroadcastExec, - NetworkCoalesceExec, + NetworkCoalesceExec, SamplerExec, }; use crate::stage::{LocalStage, RemoteStage, Stage}; use crate::worker::WorkerConnectionPool; @@ -232,6 +232,9 @@ impl PhysicalExtensionCodec for DistributedCodec { .collect(), })) } + DistributedExecNode::Sampler(SamplerExecProto {}) => { + Ok(Arc::new(SamplerExec::new(require_one_child(inputs)?))) + } } } @@ -348,6 +351,14 @@ impl PhysicalExtensionCodec for DistributedCodec { node: Some(DistributedExecNode::ChildrenIsolatorUnion(inner)), }; + wrapper.encode(buf).map_err(|e| proto_error(format!("{e}"))) + } else if let Some(_node) = node.as_any().downcast_ref::() { + let inner = SamplerExecProto {}; + + let wrapper = DistributedExecProto { + node: Some(DistributedExecNode::Sampler(inner)), + }; + wrapper.encode(buf).map_err(|e| proto_error(format!("{e}"))) } else { Err(proto_error(format!("Unexpected plan {}", node.name()))) @@ -379,7 +390,7 @@ pub struct ExecutionTaskProto { #[derive(Clone, PartialEq, ::prost::Message)] pub struct DistributedExecProto { - #[prost(oneof = "DistributedExecNode", tags = "1, 2, 3, 4, 5, 6")] + #[prost(oneof = "DistributedExecNode", tags = "1, 2, 3, 4, 5, 6, 7")] pub node: Option, } @@ -396,6 +407,8 @@ pub enum DistributedExecNode { NetworkBroadcast(NetworkBroadcastExecProto), #[prost(message, tag = "6")] Broadcast(BroadcastExecProto), + #[prost(message, tag = "7")] + Sampler(SamplerExecProto), } /// Protobuf representation of the [NetworkShuffleExec] physical node. It serves as @@ -508,6 +521,9 @@ pub struct BroadcastExecProto { pub consumer_task_count: u64, } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SamplerExecProto {} + fn new_network_broadcast_exec( partitioning: Partitioning, schema: SchemaRef, diff --git a/src/worker/generated/worker.rs b/src/worker/generated/worker.rs index 74c906d1..da294d86 100644 --- a/src/worker/generated/worker.rs +++ b/src/worker/generated/worker.rs @@ -21,7 +21,7 @@ pub mod coordinator_to_worker_msg { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct WorkerToCoordinatorMsg { - #[prost(oneof = "worker_to_coordinator_msg::Inner", tags = "1")] + #[prost(oneof = "worker_to_coordinator_msg::Inner", tags = "1, 2, 3")] pub inner: ::core::option::Option, } /// Nested message and enum types in `WorkerToCoordinatorMsg`. @@ -34,6 +34,12 @@ pub mod worker_to_coordinator_msg { /// metrics\[i\] is the set of metrics for plan node i in pre-order traversal order. #[prost(message, tag = "1")] TaskMetrics(super::TaskMetrics), + /// Load information reported by a task. This information is used for dynamically + /// sizing the number of workers involved in a query. + #[prost(message, tag = "2")] + LoadInfo(super::LoadInfo), + #[prost(bool, tag = "3")] + LoadInfoEos(bool), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -49,6 +55,21 @@ pub struct TaskMetrics { #[prost(message, optional, tag = "2")] pub task_metrics: ::core::option::Option, } +/// Load information reported for a specific partition with information about this +/// amount of data flowing through the plan. +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct LoadInfo { + /// The partition index to which this message belongs to. + #[prost(uint64, tag = "1")] + pub partition: u64, + /// Tha amount of bytes ready to be returned. + #[prost(uint64, tag = "2")] + pub bytes_ready: u64, + /// The estimated velocity at which data will flow through the node. If all the bytes were + /// already accumulated, they will be reported by `bytes_ready`, and this field will be 0. + #[prost(uint64, tag = "3")] + pub bytes_per_second: u64, +} #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetWorkerInfoRequest {} #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -119,6 +140,12 @@ pub struct ExecuteTaskRequest { /// The end of the partition range of the specified task that is going to be executed. #[prost(uint64, tag = "3")] pub target_partition_end: u64, + /// The amount of partitions per task that are going to consume from this task. + #[prost(uint64, tag = "4")] + pub consumer_partitions: u64, + /// The amount of tasks that are going to consume from this task. + #[prost(uint64, tag = "5")] + pub consumer_task_count: u64, } /// A key that uniquely identifies a task in a query. #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/src/worker/impl_coordinator_channel.rs b/src/worker/impl_coordinator_channel.rs index b246085d..7990efcc 100644 --- a/src/worker/impl_coordinator_channel.rs +++ b/src/worker/impl_coordinator_channel.rs @@ -1,4 +1,5 @@ use crate::common::deserialize_uuid; +use crate::execution_plans::SamplerExec; use crate::work_unit_feed::RemoteWorkUnitFeedRegistry; use crate::worker::LocalWorkerContext; use crate::worker::generated::worker::coordinator_to_worker_msg::Inner; @@ -17,9 +18,10 @@ use datafusion::execution::SessionStateBuilder; use datafusion::prelude::SessionConfig; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf::PhysicalPlanNode; +use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; -use std::sync::Arc; use std::sync::atomic::AtomicUsize; +use std::sync::{Arc, OnceLock}; use tokio::sync::oneshot; use tonic::{Request, Response, Status, Streaming}; use url::Url; @@ -55,6 +57,7 @@ impl Worker { } let (metrics_tx, metrics_rx) = oneshot::channel(); + let mut load_info_rxs = vec![]; let task_data = || async { let headers = grpc_headers.into_headers(); @@ -98,11 +101,14 @@ impl Worker { for hook in self.hooks.on_plan.iter() { plan = hook(plan) } + load_info_rxs = + SamplerExec::kick_off_first_sampler(Arc::clone(&plan), Arc::clone(&task_ctx))?; // Initialize partition count to the number of partitions in the stage let total_partitions = plan.properties().partitioning.partition_count(); Ok::<_, DataFusionError>(TaskData { - plan, + base_plan: plan, + scaled_up_plan: Arc::new(OnceLock::new()), task_ctx, num_partitions_remaining: Arc::new(AtomicUsize::new(total_partitions)), metrics_tx: match collect_metrics { @@ -139,19 +145,35 @@ impl Worker { } }); + let load_info_stream = FuturesUnordered::from_iter(load_info_rxs) + .map(|load_info_or_err| match load_info_or_err { + Ok(load_info) => Ok(WorkerToCoordinatorMsg { + inner: Some(worker_to_coordinator_msg::Inner::LoadInfo(load_info)), + }), + Err(err) => Err(Status::internal(err.to_string())), + }) + .chain(futures::stream::once(async move { + Ok(WorkerToCoordinatorMsg { + inner: Some(worker_to_coordinator_msg::Inner::LoadInfoEos(true)), + }) + })); + // Stream back the metrics once the task finishes executing. // The oneshot receiver resolves when impl_execute_task sends the collected // metrics after all partitions have finished or been dropped. let metrics_stream = metrics_rx.into_stream(); let metrics_stream = metrics_stream.filter_map(|task_metrics| async move { match task_metrics { - Ok(task_metrics) => Some(WorkerToCoordinatorMsg { + Ok(task_metrics) => Some(Ok(WorkerToCoordinatorMsg { inner: Some(worker_to_coordinator_msg::Inner::TaskMetrics(task_metrics)), - }), + })), Err(_) => None, // channel dropped without sending any message } }); - Ok(Response::new(metrics_stream.map(Ok).boxed())) + + Ok(Response::new( + futures::stream::select(load_info_stream, metrics_stream).boxed(), + )) } } diff --git a/src/worker/impl_execute_task.rs b/src/worker/impl_execute_task.rs index 23c65932..ecd7ed8c 100644 --- a/src/worker/impl_execute_task.rs +++ b/src/worker/impl_execute_task.rs @@ -61,7 +61,10 @@ pub(crate) async fn execute_local_task( .map_err(DataFusionError::Shared)?; task_data.task_data_metrics.mark_execution_started_once(); - let plan = task_data.plan; + let plan = task_data.scaled_up_plan( + body.consumer_partitions as usize, + body.consumer_task_count as usize, + )?; let task_ctx = task_data.task_ctx; let d_cfg = DistributedConfig::from_config_options(task_ctx.session_config().options())?; let d_ctx = DistributedTaskContext::from_ctx(&task_ctx).as_ref().clone(); @@ -146,6 +149,7 @@ pub(crate) async fn execute_local_task( tokio::spawn(async move { entries.invalidate(&k).await; }); + task_data_metrics.mark_execution_finished(); if send_metrics { send_metrics_via_channel( &metrics_tx, diff --git a/src/worker/mod.rs b/src/worker/mod.rs index fc65575f..e89921fc 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -17,6 +17,5 @@ pub use session_builder::{ DefaultSessionBuilder, MappedWorkerSessionBuilder, MappedWorkerSessionBuilderExt, WorkerQueryContext, WorkerSessionBuilder, }; -pub use worker_service::Worker; - pub use task_data::TaskData; +pub use worker_service::Worker; diff --git a/src/worker/task_data.rs b/src/worker/task_data.rs index d3e17f02..d1962aeb 100644 --- a/src/worker/task_data.rs +++ b/src/worker/task_data.rs @@ -1,11 +1,14 @@ use crate::MaxLatencyMetric; +use crate::common::OnceLockResult; use crate::common::now_ns; +use crate::distributed_planner::network_boundary_scale_input; use crate::worker::generated::worker as pb; +use datafusion::error::{DataFusionError, Result}; use datafusion::execution::TaskContext; use datafusion::physical_expr_common::metrics::CustomMetricValue; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use std::sync::Arc; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use tokio::sync::oneshot; @@ -15,8 +18,8 @@ use tokio::sync::oneshot; pub struct TaskData { /// Task context suitable for execute different partitions from the same task. pub(super) task_ctx: Arc, - /// Plan to be executed. - pub(crate) plan: Arc, + pub(crate) base_plan: Arc, + pub(crate) scaled_up_plan: Arc>>, /// `num_partitions_remaining` is initialized to the total number of partitions in the task (not /// only tasks in the partition group). This is decremented for each request to the endpoint /// for this task. Once this count is zero, the task is likely complete. The task may not be @@ -105,12 +108,44 @@ impl TaskDataMetrics { impl TaskData { /// Returns the number of partitions remaining to be processed. pub(crate) fn num_partitions_remaining(&self) -> usize { - self.num_partitions_remaining - .load(std::sync::atomic::Ordering::Relaxed) + self.num_partitions_remaining.load(Ordering::SeqCst) } /// Returns the total number of partitions in this task. pub(crate) fn total_partitions(&self) -> usize { - self.plan.properties().partitioning.partition_count() + match self.scaled_up_plan.get() { + Some(Ok(plan)) => plan.output_partitioning().partition_count(), + _ => self + .base_plan + .properties() + .output_partitioning() + .partition_count(), + } + } + + pub(crate) fn scaled_up_plan( + &self, + consumer_partitions: usize, + consumer_task_count: usize, + ) -> Result> { + let result = self.scaled_up_plan.get_or_init(|| { + let scaled_up = match network_boundary_scale_input( + Arc::clone(&self.base_plan), + consumer_partitions, + consumer_task_count, + ) { + Ok(scaled_up) => scaled_up, + Err(err) => return Err(Arc::new(err)), + }; + + let partition_count = scaled_up.output_partitioning().partition_count(); + self.num_partitions_remaining + .store(partition_count, Ordering::SeqCst); + Ok(scaled_up) + }); + match result { + Ok(plan) => Ok(Arc::clone(plan)), + Err(err) => Err(DataFusionError::Shared(Arc::clone(err))), + } } } diff --git a/src/worker/test_utils/worker_handles.rs b/src/worker/test_utils/worker_handles.rs index a214070d..02556617 100644 --- a/src/worker/test_utils/worker_handles.rs +++ b/src/worker/test_utils/worker_handles.rs @@ -215,7 +215,8 @@ pub async fn register_plan_on_worker( swmr_task_data .write(Ok(TaskData { task_ctx, - plan, + base_plan: plan, + scaled_up_plan: Default::default(), num_partitions_remaining: Arc::new(AtomicUsize::new(partition_count)), metrics_tx: Arc::new(std::sync::Mutex::new(Some(metrics_tx))), task_data_metrics: Arc::new(TaskDataMetrics::new(0)), diff --git a/src/worker/worker.proto b/src/worker/worker.proto index 7aaeebc1..d7d4944b 100644 --- a/src/worker/worker.proto +++ b/src/worker/worker.proto @@ -31,6 +31,12 @@ message WorkerToCoordinatorMsg { // ensuring metrics are never lost due to early stream termination. // metrics[i] is the set of metrics for plan node i in pre-order traversal order. TaskMetrics task_metrics = 1; + + // Load information reported by a task. This information is used for dynamically + // sizing the number of workers involved in a query. + LoadInfo load_info = 2; + + bool load_info_eos = 3; } } @@ -45,6 +51,18 @@ message TaskMetrics { MetricsSet task_metrics = 2; } +// Load information reported for a specific partition with information about this +// amount of data flowing through the plan. +message LoadInfo { + // The partition index to which this message belongs to. + uint64 partition = 1; + // Tha amount of bytes ready to be returned. + uint64 bytes_ready = 2; + // The estimated velocity at which data will flow through the node. If all the bytes were + // already accumulated, they will be reported by `bytes_ready`, and this field will be 0. + uint64 bytes_per_second = 3; +} + message GetWorkerInfoRequest {} message GetWorkerInfoResponse { @@ -95,6 +113,10 @@ message ExecuteTaskRequest { uint64 target_partition_start = 2; // The end of the partition range of the specified task that is going to be executed. uint64 target_partition_end = 3; + // The amount of partitions per task that are going to consume from this task. + uint64 consumer_partitions = 4; + // The amount of tasks that are going to consume from this task. + uint64 consumer_task_count = 5; } // A key that uniquely identifies a task in a query. diff --git a/src/worker/worker_connection_pool.rs b/src/worker/worker_connection_pool.rs index a05bfd5a..7e6a7d75 100644 --- a/src/worker/worker_connection_pool.rs +++ b/src/worker/worker_connection_pool.rs @@ -88,6 +88,8 @@ impl WorkerConnectionPool { input_stage: &RemoteStage, target_partitions: Range, target_task: usize, + consumer_partitions: usize, + consumer_task_count: usize, ctx: &Arc, ) -> Result<&(dyn WorkerConnection + Sync + Send)> { let Some(worker_connection) = self.connections.get(target_task) else { @@ -109,6 +111,8 @@ impl WorkerConnectionPool { input_stage, target_partitions, target_task, + consumer_partitions, + consumer_task_count, lw_ctx, &self.metrics, )) as Box<_>) @@ -118,6 +122,8 @@ impl WorkerConnectionPool { input_stage, target_partitions, target_task, + consumer_partitions, + consumer_task_count, ctx, &self.metrics, ) @@ -173,6 +179,8 @@ impl RemoteWorkerConnection { input_stage: &RemoteStage, target_partition_range: Range, target_task: usize, + consumer_partitions: usize, + consumer_task_count: usize, ctx: &Arc, metrics: &ExecutionPlanMetricsSet, ) -> Result { @@ -222,6 +230,8 @@ impl RemoteWorkerConnection { stage_id: input_stage.num as u64, task_number: target_task as u64, }), + consumer_partitions: consumer_partitions as u64, + consumer_task_count: consumer_task_count as u64, }, ); @@ -426,6 +436,8 @@ impl LocalWorkerConnection { input_stage: &RemoteStage, target_partition_range: Range, target_task: usize, + consumer_partitions: usize, + consumer_task_count: usize, lw_ctx: Arc, metrics: &ExecutionPlanMetricsSet, ) -> Self { @@ -446,6 +458,8 @@ impl LocalWorkerConnection { task_key: Some(task_key.clone()), target_partition_start: partition_i as u64, target_partition_end: (partition_i + 1) as u64, + consumer_partitions: consumer_partitions as u64, + consumer_task_count: consumer_task_count as u64, }; let task_data_entries = Arc::clone(&lw_ctx.task_data_entries); diff --git a/tests/metrics_collection.rs b/tests/metrics_collection.rs index 70bb98f7..ec267be3 100644 --- a/tests/metrics_collection.rs +++ b/tests/metrics_collection.rs @@ -5,6 +5,7 @@ mod tests { use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion::execution::SessionState; use datafusion::physical_plan::display::DisplayableExecutionPlan; + use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion::physical_plan::{ExecutionPlan, execute_stream}; use datafusion::prelude::SessionContext; use datafusion_distributed::test_utils::localhost::start_localhost_context; @@ -255,6 +256,37 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_metrics_collection_dynamic() -> Result<(), Box> { + let (mut d_ctx, _guard, _) = start_localhost_context(3, DefaultSessionBuilder).await; + d_ctx.set_distributed_dynamic_task_count(true)?; + + let query = + r#"SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)"#; + + let s_ctx = SessionContext::default(); + let (s_physical, mut d_physical) = execute(&s_ctx, &d_ctx, query).await?; + d_physical = rewrite_with_metrics(d_physical, DistributedMetricsFormat::Aggregated).await; + println!("{}", display_plan_ascii(s_physical.as_ref(), true)); + println!("{}", display_plan_ascii(d_physical.as_ref(), true)); + + assert_metrics_equal::( + ["output_rows", "output_bytes"], + &s_physical, + &d_physical, + 0, + ); + + assert_metrics_equal::( + ["output_rows", "output_bytes"], + &s_physical, + &d_physical, + 0, + ); + + Ok(()) + } + /// Looks for an [ExecutionPlan] that matches the provided type parameter `T` in /// both root nodes and compares its metrics. /// There might be more than one, so `index` determines which one is compared.