diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/src/distributed_planner/distributed_config.rs b/src/distributed_planner/distributed_config.rs index 5b3500bc..28bf47d9 100644 --- a/src/distributed_planner/distributed_config.rs +++ b/src/distributed_planner/distributed_config.rs @@ -58,6 +58,10 @@ extensions_options! { /// Disabled by default because its effectiveness is workload-dependent: it helps when /// aggregation significantly reduces cardinality, but adds overhead when it does not. pub partial_reduce: bool, default = false + /// When enabled, build per-task stage plans instead of reusing one shared stage plan + /// payload for all tasks. Disabled by default; currently a no-op scaffold used for + /// incremental rollout of task-specialized plans. + pub task_specialized_stage_plans: bool, default = false /// Soft byte budget that each per-worker connection will buffer in memory before pausing /// the gRPC pull from that worker. Per-partition channels are unbounded (to avoid /// head-of-line blocking between sibling partitions), so backpressure is enforced diff --git a/src/distributed_planner/mod.rs b/src/distributed_planner/mod.rs index b945969b..48d8b943 100644 --- a/src/distributed_planner/mod.rs +++ b/src/distributed_planner/mod.rs @@ -6,9 +6,12 @@ mod partial_reduce_below_network_shuffles; mod plan_annotator; mod session_state_builder_ext; mod task_estimator; +mod task_specialized_stage_plans; pub use distributed_config::DistributedConfig; pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt}; pub use session_state_builder_ext::SessionStateBuilderExt; pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator, TaskRoutingContext}; pub(crate) use task_estimator::{get_distributed_task_estimator, set_distributed_task_estimator}; + +pub(crate) use task_specialized_stage_plans::build_task_specialized_stage_plans; diff --git a/src/distributed_planner/task_specialized_stage_plans.rs b/src/distributed_planner/task_specialized_stage_plans.rs new file mode 100644 index 00000000..4979b892 --- /dev/null +++ b/src/distributed_planner/task_specialized_stage_plans.rs @@ -0,0 +1,79 @@ +use crate::execution_plans::PartitionIsolatorExec; +use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion::common::{Result, internal_err}; +use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use std::sync::Arc; + +/// Builds one physical plan per task for a stage. +/// +/// The current specialization pass rewrites `PartitionIsolatorExec` nodes into +/// task-fixed `PartitionIsolatorExec::for_task(...)` nodes, fixing the partition +/// group at plan build time for each task. +pub(crate) fn build_task_specialized_stage_plans( + plan: &Arc, + task_count: usize, +) -> Result>> { + (0..task_count) + .map(|task_index| specialize_plan_for_task(plan, task_index, task_count)) + .collect() +} + +fn specialize_plan_for_task( + plan: &Arc, + task_index: usize, + task_count: usize, +) -> Result> { + Arc::clone(plan) + .transform_up(|node| { + let Some(isolator) = node.as_any().downcast_ref::() else { + return Ok(Transformed::no(node)); + }; + + let input = Arc::clone(&isolator.input); + let input_partitions = input.output_partitioning().partition_count(); + if task_index >= task_count { + return internal_err!( + "invalid task specialization index {task_index} >= {task_count}" + ); + } + let partition_group = + PartitionIsolatorExec::partition_group(input_partitions, task_index, task_count); + let specialized: Arc = + Arc::new(PartitionIsolatorExec::for_task(input, partition_group)); + Ok(Transformed::yes(specialized)) + }) + .data() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::execution_plans::PartitionIsolatorExec; + use crate::test_utils::mock_exec::MockExec; + use datafusion::arrow::datatypes::Schema; + + #[test] + fn returns_one_plan_per_task() { + let exec: Arc = Arc::new(MockExec::new_partitioned( + vec![vec![], vec![], vec![], vec![]], + Arc::new(Schema::empty()), + )); + let plan: Arc = Arc::new(PartitionIsolatorExec::new(exec, 2)); + + let plans = build_task_specialized_stage_plans(&plan, 2).unwrap(); + + assert_eq!(plans.len(), 2); + assert!( + plans[0] + .as_any() + .downcast_ref::() + .is_some() + ); + assert!( + plans[1] + .as_any() + .downcast_ref::() + .is_some() + ); + } +} \ No newline at end of file diff --git a/src/execution_plans/distributed.rs b/src/execution_plans/distributed.rs index c718bfee..5ae51e12 100644 --- a/src/execution_plans/distributed.rs +++ b/src/execution_plans/distributed.rs @@ -1,6 +1,8 @@ use crate::common::{require_one_child, serialize_uuid, task_ctx_with_extension}; use crate::config_extension_ext::get_config_extension_propagation_headers; -use crate::distributed_planner::{NetworkBoundaryExt, get_distributed_task_estimator}; +use crate::distributed_planner::{ + NetworkBoundaryExt, build_task_specialized_stage_plans, get_distributed_task_estimator, +}; use crate::execution_plans::ChildrenIsolatorUnionExec; use crate::networking::get_distributed_worker_resolver; use crate::passthrough_headers::get_passthrough_headers; @@ -198,7 +200,7 @@ impl DistributedExec { let stage = plan.input_stage(); let mut spawner = - CoordinatorToWorkerTaskSpawner::new(stage, &metrics, &codec, &mut join_set)?; + CoordinatorToWorkerTaskSpawner::new(stage, &metrics, &codec, &mut join_set, ctx)?; let routed_urls = match task_estimator.route_tasks(&TaskRoutingContext { task_ctx: Arc::clone(ctx), @@ -362,7 +364,7 @@ type WorkerResponseRx = struct CoordinatorToWorkerTaskSpawner<'a> { plan: &'a Arc, - plan_proto: Vec, + plan_protos: Vec>, query_id: Uuid, stage_id: usize, task_count: usize, @@ -378,17 +380,30 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { metrics: &'a CoordinatorToWorkerMetrics, codec: &'a dyn PhysicalExtensionCodec, join_set: &'a mut JoinSet>, + ctx: &Arc, ) -> Result { let Some(plan) = &stage.plan else { return internal_err!("Plan is not set for stage {}", stage.num); }; - let plan_proto = - PhysicalPlanNode::try_from_physical_plan(Arc::clone(plan), codec)?.encode_to_vec(); + let task_count = stage.tasks.len(); + let d_cfg = DistributedConfig::from_config_options(ctx.session_config().options())?; + + let plans = if d_cfg.task_specialized_stage_plans { + build_task_specialized_stage_plans(plan, task_count)? + } else { + vec![Arc::clone(plan); task_count] + }; + + let mut plan_protos = Vec::with_capacity(plans.len()); + for plan in plans { + plan_protos + .push(PhysicalPlanNode::try_from_physical_plan(plan, codec)?.encode_to_vec()); + } Ok(Self { plan, - plan_proto, + plan_protos, query_id: stage.query_id, stage_id: stage.num, task_count: stage.tasks.len(), @@ -464,14 +479,14 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { }; let msg = CoordinatorToWorkerMsg { inner: Some(Inner::SetPlanRequest(SetPlanRequest { - plan_proto: self.plan_proto.clone(), + plan_proto: self.plan_protos[task_i].clone(), task_count: self.task_count as u64, task_key: Some(task_key.clone()), work_unit_feed_declarations, target_worker_url: url.to_string(), })), }; - let plan_size = self.plan_proto.len(); + let plan_size = self.plan_protos[task_i].len(); let (coordinator_to_worker_tx, coordinator_to_worker_rx) = tokio::sync::mpsc::unbounded_channel(); diff --git a/src/execution_plans/partition_isolator.rs b/src/execution_plans/partition_isolator.rs index 4207c416..9a9c269f 100644 --- a/src/execution_plans/partition_isolator.rs +++ b/src/execution_plans/partition_isolator.rs @@ -56,6 +56,7 @@ pub struct PartitionIsolatorExec { pub(crate) input: Arc, pub(crate) properties: Arc, pub(crate) n_tasks: usize, + pub(crate) fixed_partition_group: Option>, pub(crate) metrics: ExecutionPlanMetricsSet, } @@ -72,6 +73,7 @@ impl PartitionIsolatorExec { input: input.clone(), properties: Arc::new(properties), n_tasks, + fixed_partition_group: None, metrics: ExecutionPlanMetricsSet::new(), } } @@ -98,6 +100,19 @@ impl PartitionIsolatorExec { ) -> Vec { Self::partition_groups(input_partitions, n_tasks)[task_i].clone() } + + pub(crate) fn for_task(input: Arc, partition_group: Vec) -> Self { + let properties = ::clone(&input.properties().clone()) + .with_partitioning(Partitioning::UnknownPartitioning(partition_group.len())); + + Self { + input, + properties: Arc::new(properties), + n_tasks: 1, + fixed_partition_group: Some(partition_group), + metrics: ExecutionPlanMetricsSet::new(), + } + } } impl DisplayAs for PartitionIsolatorExec { @@ -165,11 +180,13 @@ impl ExecutionPlan for PartitionIsolatorExec { let metric = MetricBuilder::new(&self.metrics).output_rows(partition); let input_partitions = self.input.output_partitioning().partition_count(); - let partition_group = Self::partition_group( - input_partitions, - task_context.task_index, - task_context.task_count, - ); + let partition_group = self.fixed_partition_group.clone().unwrap_or_else(|| { + Self::partition_group( + input_partitions, + task_context.task_index, + task_context.task_count, + ) + }); // if our partition group is [7,8,9] and we are asked for parittion 1, // then look up that index in our group and execute that partition, in this