fix(physical-optimizer): make OutputRequirements idempotent#22522
Open
wirybeaver wants to merge 1 commit into
Open
fix(physical-optimizer): make OutputRequirements idempotent#22522wirybeaver wants to merge 1 commit into
wirybeaver wants to merge 1 commit into
Conversation
`OutputRequirements::new_add_mode()` was stacking a new `OutputRequirementExec` wrapper at the top of the plan on every invocation. `require_top_ordering_helper` descends through any operator that maintains input order and has no hard ordering requirement, and `OutputRequirementExec` itself qualifies — so on a second pass the helper walked past the existing wrapper, found no SortExec/SPM beneath, and `require_top_ordering` added a fresh wrapper above the old one. Fix: at the start of `require_top_ordering`, return the plan unchanged when it is already topped by an `OutputRequirementExec`. The existing wrapper was either added by this rule's prior run (in which case it is already correct) or inserted intentionally by the caller (in which case we should not disturb it). Motivation: adaptive execution in datafusion-ballista AQE (apache/datafusion-ballista#1359) re-runs the entire `PhysicalOptimizer` chain after every completed stage. Although the chain-level effect is masked when `OutputRequirements::new_remove_mode()` later strips the wrapper, the rule itself violates the idempotence property that `PhysicalOptimizerRule`s are expected to satisfy. Surfaced by the discovery harness added in a sibling PR. Adds `datafusion/core/tests/physical_optimizer/output_requirements.rs` with a focused test that invokes `new_add_mode` twice on a bare parquet scan and asserts structural equality. Fails before this fix (two stacked wrappers); passes after. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Related to apache/datafusion-ballista#1359
Rationale
Ballista's Adaptive Query Execution (AQE) planner re-invokes DataFusion's full
PhysicalOptimizerchain after every completed stage (AdaptivePlanner::replan_stages). Rules that are not idempotent (rule(rule(x)) != rule(x)) stack execution-plan nodes on each pass.OutputRequirements::new_add_mode()wraps the plan root withOutputRequirementExecto preserve global ordering/distribution requirements. On a second pass the wrapper'smaintains_input_order() == [true]andrequired_input_ordering() == [None]causerequire_top_ordering_helperto recurse through it and produce a second wrapper, yieldingOutputRequirementExec(OutputRequirementExec(...)). Each AQE replan adds another layer.What changes are included in this PR?
require_top_ordering(): if the plan root is already anOutputRequirementExec, return it unchanged. This makes the rule idempotent with zero overhead for single-pass use.new_add_mode()andrequire_top_ordering()documenting the idempotence guarantee.tests/physical_optimizer/output_requirements.rs:add_mode_is_idempotent_on_bare_scan— bareParquetExec(exercisesis_changed = falsepath).add_mode_is_idempotent_on_sorted_plan—SortExec → ParquetExec(exercisesis_changed = truepath).Are these changes tested?
Yes. Two new tests run the rule twice on distinct fixtures and assert structural equality via
get_plan_string. Both fail without the fix (double-wrappedOutputRequirementExec) and pass with it.Are there any user-facing changes?
No.
OutputRequirementExecis an internal ancillary node stripped before execution; the idempotence guard only affects re-optimization scenarios (AQE).🤖 Generated with Claude Code