perf(physical-optimizer): skip ensure_distribution rebuild when children are unchanged#22521
Open
zhuqi-lucas wants to merge 2 commits into
Open
perf(physical-optimizer): skip ensure_distribution rebuild when children are unchanged#22521zhuqi-lucas wants to merge 2 commits into
zhuqi-lucas wants to merge 2 commits into
Conversation
…ren are unchanged ensure_distribution was unconditionally calling plan.with_new_children after collecting the (possibly redistributed) children, even when none of the children were actually replaced. For nodes like ProjectionExec, that path runs through try_new and recomputes the schema, equivalence properties, and output ordering each time, which is pure overhead when the input Arcs are identical. Compare each new child Arc with the original via Arc::ptr_eq and reuse the existing plan Arc when nothing changed. The UnionExec to InterleaveExec special case still runs first because it intentionally produces a new node. On a representative deep ProjectionExec stack (30 layers over a sorted parquet scan, no join / aggregate / unmet ordering, 5000 iterations) this brings ensure_distribution from 170.55 us/call to 59.36 us/call, a ~2.87x speedup. Profiling on a real workload dominated by point queries showed ProjectionExec::with_new_children taking 1.94s out of a 2.87s ensure_distribution slice in a 60s sample, so this is the bulk of the rule's cost on that shape. Closes apache#22520
Contributor
There was a problem hiding this comment.
Pull request overview
This PR optimizes the physical optimizer’s distribution enforcement by avoiding unnecessary plan-node rebuilds when the computed child plans are identical to the existing children, reducing planning CPU overhead (notably for deep ProjectionExec stacks) without changing correctness.
Changes:
- Detect when all post-processing child plan
Arcs are pointer-identical to the original children viaArc::ptr_eq. - Reuse the existing
planArcand skipwith_new_childrenwhen children are unchanged. - Preserve the existing
UnionExec→InterleaveExecspecial-case behavior by applying it before the “children unchanged” fast-path.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #22520.
Rationale for this change
ensure_distributionindatafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rsunconditionally callsplan.with_new_children(children_plans)after collecting the (possibly redistributed) children, even when none of those children were actually replaced. For nodes likeProjectionExec, that path runs throughtry_newand recomputes the schema, equivalence properties, output ordering, and output partitioning, then allocates a newArc<dyn ExecutionPlan>. When every child Arc is pointer-identical to the input, that work produces a logically identical node — pure overhead.The cost is amplified by two factors:
ensure_distributionto inject aRepartitionExecorSortExecfor) hit this wasted rebuild at every node in the plan. A 5–30 deepProjectionExecstack pays the cost N times.ProjectionExec::try_newareO(num_columns): per-columndata_type/nullablelookup to build the new schema, per-column remapping of equivalence classes through the projection mapping, and per-column lookup when rewritingPhysicalSortExprs into the output ordering. Wide schemas (tens of columns) make every wasted rebuild proportionally heavier.Profiling a production point-query workload (wide schemas, deep
ProjectionExecstacks) showedProjectionExec::with_new_childrenas the single largest cost insideensure_distribution:ensure_distributiontotal: 2.87s of a 60s CPU sampleProjectionExec::with_new_children: 1.94s (56% of the rule)SortExec::with_new_children: 0.11sWhat changes are included in this PR?
After collecting
children_plans, compare each new child Arc with the original viaArc::ptr_eq. When every child is unchanged, reuse the existingplanArc and skipwith_new_children. TheUnionExectoInterleaveExecspecial case still runs first because it intentionally produces a new node even when child Arcs are unchanged.This relies on the fact that
ensure_distributionalready produces pointer-identical Arcs for children that need no redistribution (it threads the original Arc through unchanged), soArc::ptr_eqprecisely distinguishes "rewritten" from "untouched" children at O(1) per child.Are these changes tested?
Yes. The existing
enforce_distributionsuite passes unchanged (66/66):The behavior is observable only as a CPU reduction; correctness is preserved because
ExecutionPlannodes are immutable, so reusing the original Arc produces the same plan tree aswith_new_children(unchanged_children)would have, just without the schema / ordering / equivalence / partitioning recomputation.Are there any user-facing changes?
No. Same plans, lower planning time.
Micro-benchmark
Plan shape: 30-deep
ProjectionExecstack over a sorted parquet scan, 5000 iterations.Wider schemas (more projection expressions per node) widen the gap further because each skipped
with_new_childrenavoids more O(num_columns) work.