fix(physical-plan): make HashJoinExec dynamic filter pushdown idempotent#22523
Open
wirybeaver wants to merge 1 commit into
Open
fix(physical-plan): make HashJoinExec dynamic filter pushdown idempotent#22523wirybeaver wants to merge 1 commit into
wirybeaver wants to merge 1 commit into
Conversation
`FilterPushdown::new_post_optimization()` was ANDing a fresh `DynamicFilterPhysicalExpr` onto the probe-side scan's predicate on every invocation. After N passes the probe-side data source carried `DynamicFilter AND DynamicFilter AND ...` (N copies of the same empty expression). Root cause: `HashJoinExec::gather_filters_for_pushdown` always created a new dynamic filter in the Post phase, regardless of whether the join already had one from a prior pass. The previous-pass filter is retained on the `HashJoinExec` itself (the `dynamic_filter` field is preserved through `with_new_children` via the builder) and the same `Arc<DynamicFilterPhysicalExpr>` is wired into the probe-side scan's predicate, so a new one is redundant and stacks duplicates. Fix: skip dynamic filter creation when `self.dynamic_filter.is_some()`. The existing shared `Arc` keeps the probe-side scan correctly wired to the build-side accumulator that will populate it. Motivation: adaptive execution in datafusion-ballista AQE (apache/datafusion-ballista#1359) re-runs the entire `PhysicalOptimizer` chain after every completed stage. Unlike `OutputRequirements` (whose duplicate wrappers are masked by `new_remove_mode` later in the chain), this duplication survives to the executed plan and degrades scan performance with redundant filter evaluation. Adds `post_phase_is_idempotent_on_hash_join` to `datafusion/core/tests/physical_optimizer/filter_pushdown.rs`: builds a hash join over two parquet scans, invokes the rule twice, asserts the plan strings match. Fails before this fix (two `AND DynamicFilter` clauses on the probe side); passes after. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Related to apache/datafusion-ballista#1359
Rationale
Ballista's Adaptive Query Execution (AQE) planner re-invokes DataFusion's full
PhysicalOptimizerchain after every completed stage.FilterPushdown::new_post_optimization()is not idempotent on plans containingHashJoinExec.In the
Postphase,HashJoinExec::gather_filters_for_pushdownunconditionally creates a newDynamicFilterPhysicalExprand installs it on the probe-side child viawith_self_filter. After pass 1 the join already carries adynamic_filter: Some(...), and the sharedArc<DynamicFilterPhysicalExpr>is already wired into the probe-side scan's predicate. On pass 2 a second dynamic filter is created and ANDed onto the existing predicate, producingDynamicFilter AND DynamicFilter. Each subsequent pass adds another duplicate, compounding indefinitely in AQE replan loops.What changes are included in this PR?
HashJoinExec::gather_filters_for_pushdown: skip dynamic-filter creation whenself.dynamic_filter.is_some(), meaning a previous pass already installed one. The existingArcremains valid and correctly wired into the probe-side scan.post_phase_is_idempotent_on_hash_joinintests/physical_optimizer/filter_pushdown.rs: builds aHashJoinExec, runsFilterPushdown::new_post_optimization()twice, and asserts structural equality viaget_plan_string.Are these changes tested?
Yes. The new test fails without the fix (plan strings diverge due to duplicated dynamic filter predicates) and passes with it.
Are there any user-facing changes?
No. Dynamic filter pushdown is an internal optimization; the idempotence guard only affects re-optimization scenarios (AQE).
🤖 Generated with Claude Code