Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3971,3 +3971,38 @@ fn adjust_input_keys_ordering_no_transform_for_filter_scan() -> Result<()> {
);
Ok(())
}

/// Verifies the `ensure_distribution` fast path: when no child of a node is
/// replaced (no `RepartitionExec` or `SortExec` injection is required),
/// the rule must reuse the input `Arc<dyn ExecutionPlan>` unchanged instead
/// of calling `with_new_children`. For a deep `ProjectionExec` chain over a
/// single-partition scan with `target_partitions = 1`, every node hits this
/// fast path, so the root returned by `ensure_distribution` must be the
/// same `Arc` as the input.
///
/// Regression test for the optimization that avoids
/// `ProjectionExec::with_new_children` (which recomputes schema, equivalence
/// properties, output ordering, and partitioning) on the common point-query
/// plan shape.
#[test]
fn ensure_distribution_reuses_plan_arc_when_no_redistribution_needed() -> Result<()> {
let scan = parquet_exec();
let proj1 = projection_exec_with_alias(
scan,
vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "b".to_string()),
],
);
let proj2 =
projection_exec_with_alias(proj1, vec![("a".to_string(), "a".to_string())]);
let plan: Arc<dyn ExecutionPlan> = proj2;

let result = ensure_distribution_helper(Arc::clone(&plan), 1, false)?;

assert!(
Arc::ptr_eq(&result, &plan),
"ensure_distribution must reuse the input Arc when no children require redistribution"
);
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::union::{InterleaveExec, UnionExec, can_interleave};
use datafusion_physical_plan::windows::WindowAggExec;
use datafusion_physical_plan::windows::{BoundedWindowAggExec, get_best_fitting_window};
use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning};
use datafusion_physical_plan::{
Distribution, ExecutionPlan, Partitioning, with_new_children_if_necessary,
};

use itertools::izip;

Expand Down Expand Up @@ -1362,7 +1364,16 @@ pub fn ensure_distribution(
// Data
Arc::new(InterleaveExec::try_new(children_plans)?)
} else {
plan.with_new_children(children_plans)?
// Route through `with_new_children_if_necessary` so the common
// case where no child was replaced above skips the expensive
// `with_new_children` rebuild. For nodes like `ProjectionExec`,
// `with_new_children` recomputes schema / equivalence properties /
// output ordering via `try_new` even when the input Arcs are
// identical, which dominates `ensure_distribution` time on deep
// projection stacks over plans where no distribution change
// applies (point queries with no join / aggregate / unmet
// ordering).
with_new_children_if_necessary(plan, children_plans)?
};

Ok(Transformed::yes(DistributionContext::new(
Expand Down
Loading