Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
4 changes: 4 additions & 0 deletions src/distributed_planner/distributed_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ extensions_options! {
/// Disabled by default because its effectiveness is workload-dependent: it helps when
/// aggregation significantly reduces cardinality, but adds overhead when it does not.
pub partial_reduce: bool, default = false
/// When enabled, build per-task stage plans instead of reusing one shared stage plan
/// payload for all tasks. Disabled by default; currently a no-op scaffold used for
/// incremental rollout of task-specialized plans.
pub task_specialized_stage_plans: bool, default = false
/// Soft byte budget that each per-worker connection will buffer in memory before pausing
/// the gRPC pull from that worker. Per-partition channels are unbounded (to avoid
/// head-of-line blocking between sibling partitions), so backpressure is enforced
Expand Down
3 changes: 3 additions & 0 deletions src/distributed_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ mod partial_reduce_below_network_shuffles;
mod plan_annotator;
mod session_state_builder_ext;
mod task_estimator;
mod task_specialized_stage_plans;

pub use distributed_config::DistributedConfig;
pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt};
pub use session_state_builder_ext::SessionStateBuilderExt;
pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator, TaskRoutingContext};
pub(crate) use task_estimator::{get_distributed_task_estimator, set_distributed_task_estimator};

pub(crate) use task_specialized_stage_plans::build_task_specialized_stage_plans;
79 changes: 79 additions & 0 deletions src/distributed_planner/task_specialized_stage_plans.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::execution_plans::PartitionIsolatorExec;
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::common::{Result, internal_err};
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use std::sync::Arc;

/// Builds one physical plan per task for a stage.
///
/// The current specialization pass rewrites `PartitionIsolatorExec` nodes into
/// task-fixed `PartitionIsolatorExec::for_task(...)` nodes, fixing the partition
/// group at plan build time for each task.
pub(crate) fn build_task_specialized_stage_plans(
plan: &Arc<dyn ExecutionPlan>,
task_count: usize,
) -> Result<Vec<Arc<dyn ExecutionPlan>>> {
(0..task_count)
.map(|task_index| specialize_plan_for_task(plan, task_index, task_count))
.collect()
}

fn specialize_plan_for_task(
plan: &Arc<dyn ExecutionPlan>,
task_index: usize,
task_count: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
Arc::clone(plan)
.transform_up(|node| {
let Some(isolator) = node.as_any().downcast_ref::<PartitionIsolatorExec>() else {
return Ok(Transformed::no(node));
};

let input = Arc::clone(&isolator.input);
let input_partitions = input.output_partitioning().partition_count();
if task_index >= task_count {
return internal_err!(
"invalid task specialization index {task_index} >= {task_count}"
);
}
let partition_group =
PartitionIsolatorExec::partition_group(input_partitions, task_index, task_count);
let specialized: Arc<dyn ExecutionPlan> =
Arc::new(PartitionIsolatorExec::for_task(input, partition_group));
Ok(Transformed::yes(specialized))
})
.data()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::execution_plans::PartitionIsolatorExec;
use crate::test_utils::mock_exec::MockExec;
use datafusion::arrow::datatypes::Schema;

#[test]
fn returns_one_plan_per_task() {
let exec: Arc<dyn ExecutionPlan> = Arc::new(MockExec::new_partitioned(
vec![vec![], vec![], vec![], vec![]],
Arc::new(Schema::empty()),
));
let plan: Arc<dyn ExecutionPlan> = Arc::new(PartitionIsolatorExec::new(exec, 2));

let plans = build_task_specialized_stage_plans(&plan, 2).unwrap();

assert_eq!(plans.len(), 2);
assert!(
plans[0]
.as_any()
.downcast_ref::<PartitionIsolatorExec>()
.is_some()
);
assert!(
plans[1]
.as_any()
.downcast_ref::<PartitionIsolatorExec>()
.is_some()
);
}
}
31 changes: 23 additions & 8 deletions src/execution_plans/distributed.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::common::{require_one_child, serialize_uuid, task_ctx_with_extension};
use crate::config_extension_ext::get_config_extension_propagation_headers;
use crate::distributed_planner::{NetworkBoundaryExt, get_distributed_task_estimator};
use crate::distributed_planner::{
NetworkBoundaryExt, build_task_specialized_stage_plans, get_distributed_task_estimator,
};
use crate::execution_plans::ChildrenIsolatorUnionExec;
use crate::networking::get_distributed_worker_resolver;
use crate::passthrough_headers::get_passthrough_headers;
Expand Down Expand Up @@ -198,7 +200,7 @@ impl DistributedExec {
let stage = plan.input_stage();

let mut spawner =
CoordinatorToWorkerTaskSpawner::new(stage, &metrics, &codec, &mut join_set)?;
CoordinatorToWorkerTaskSpawner::new(stage, &metrics, &codec, &mut join_set, ctx)?;

let routed_urls = match task_estimator.route_tasks(&TaskRoutingContext {
task_ctx: Arc::clone(ctx),
Expand Down Expand Up @@ -362,7 +364,7 @@ type WorkerResponseRx =

struct CoordinatorToWorkerTaskSpawner<'a> {
plan: &'a Arc<dyn ExecutionPlan>,
plan_proto: Vec<u8>,
plan_protos: Vec<Vec<u8>>,
query_id: Uuid,
stage_id: usize,
task_count: usize,
Expand All @@ -378,17 +380,30 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> {
metrics: &'a CoordinatorToWorkerMetrics,
codec: &'a dyn PhysicalExtensionCodec,
join_set: &'a mut JoinSet<Result<()>>,
ctx: &Arc<TaskContext>,
) -> Result<Self> {
let Some(plan) = &stage.plan else {
return internal_err!("Plan is not set for stage {}", stage.num);
};

let plan_proto =
PhysicalPlanNode::try_from_physical_plan(Arc::clone(plan), codec)?.encode_to_vec();
let task_count = stage.tasks.len();
let d_cfg = DistributedConfig::from_config_options(ctx.session_config().options())?;

let plans = if d_cfg.task_specialized_stage_plans {
build_task_specialized_stage_plans(plan, task_count)?
} else {
vec![Arc::clone(plan); task_count]
};

let mut plan_protos = Vec::with_capacity(plans.len());
for plan in plans {
plan_protos
.push(PhysicalPlanNode::try_from_physical_plan(plan, codec)?.encode_to_vec());
}

Ok(Self {
plan,
plan_proto,
plan_protos,
query_id: stage.query_id,
stage_id: stage.num,
task_count: stage.tasks.len(),
Expand Down Expand Up @@ -464,14 +479,14 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> {
};
let msg = CoordinatorToWorkerMsg {
inner: Some(Inner::SetPlanRequest(SetPlanRequest {
plan_proto: self.plan_proto.clone(),
plan_proto: self.plan_protos[task_i].clone(),
task_count: self.task_count as u64,
task_key: Some(task_key.clone()),
work_unit_feed_declarations,
target_worker_url: url.to_string(),
})),
};
let plan_size = self.plan_proto.len();
let plan_size = self.plan_protos[task_i].len();

let (coordinator_to_worker_tx, coordinator_to_worker_rx) =
tokio::sync::mpsc::unbounded_channel();
Expand Down
27 changes: 22 additions & 5 deletions src/execution_plans/partition_isolator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct PartitionIsolatorExec {
pub(crate) input: Arc<dyn ExecutionPlan>,
pub(crate) properties: Arc<PlanProperties>,
pub(crate) n_tasks: usize,
pub(crate) fixed_partition_group: Option<Vec<usize>>,
pub(crate) metrics: ExecutionPlanMetricsSet,
}

Expand All @@ -72,6 +73,7 @@ impl PartitionIsolatorExec {
input: input.clone(),
properties: Arc::new(properties),
n_tasks,
fixed_partition_group: None,
metrics: ExecutionPlanMetricsSet::new(),
}
}
Expand All @@ -98,6 +100,19 @@ impl PartitionIsolatorExec {
) -> Vec<usize> {
Self::partition_groups(input_partitions, n_tasks)[task_i].clone()
}

pub(crate) fn for_task(input: Arc<dyn ExecutionPlan>, partition_group: Vec<usize>) -> Self {
let properties = <PlanProperties as Clone>::clone(&input.properties().clone())
.with_partitioning(Partitioning::UnknownPartitioning(partition_group.len()));

Self {
input,
properties: Arc::new(properties),
n_tasks: 1,
fixed_partition_group: Some(partition_group),
metrics: ExecutionPlanMetricsSet::new(),
}
}
}

impl DisplayAs for PartitionIsolatorExec {
Expand Down Expand Up @@ -165,11 +180,13 @@ impl ExecutionPlan for PartitionIsolatorExec {
let metric = MetricBuilder::new(&self.metrics).output_rows(partition);
let input_partitions = self.input.output_partitioning().partition_count();

let partition_group = Self::partition_group(
input_partitions,
task_context.task_index,
task_context.task_count,
);
let partition_group = self.fixed_partition_group.clone().unwrap_or_else(|| {
Self::partition_group(
input_partitions,
task_context.task_index,
task_context.task_count,
)
});

// if our partition group is [7,8,9] and we are asked for parittion 1,
// then look up that index in our group and execute that partition, in this
Expand Down