diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 4ff1fad8f52b9..7cda84c3dc437 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -3374,3 +3374,44 @@ fn test_filter_pushdown_through_sort_with_projection() { " ); } + +/// `FilterPushdown::new_post_optimization()` must be idempotent. When applied +/// to a HashJoinExec, the rule installs a dynamic filter on the probe-side +/// scan; before the fix in `HashJoinExec::gather_filters_for_pushdown`, every +/// invocation created a *new* `DynamicFilterPhysicalExpr` and ANDed it onto +/// the probe side's existing predicate, producing +/// `DynamicFilter AND DynamicFilter AND ...` after N passes. +/// +/// AQE (datafusion-ballista#1359) re-runs the optimizer chain after every +/// completed stage, so this would compound indefinitely without the guard. +#[test] +fn post_phase_is_idempotent_on_hash_join() { + use crate::physical_optimizer::test_utils::{ + hash_join_exec, parquet_exec, schema, + }; + use datafusion_common::JoinType; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; + use datafusion_physical_plan::get_plan_string; + use datafusion_physical_plan::joins::utils::JoinOn; + + let s = schema(); + let left = parquet_exec(Arc::clone(&s)); + let right = parquet_exec(Arc::clone(&s)); + let join_on: JoinOn = vec![( + Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()), + Arc::new(Column::new_with_schema("a", &right.schema()).unwrap()), + )]; + let plan = hash_join_exec(left, right, join_on, None, &JoinType::Inner).unwrap(); + + let config = ConfigOptions::new(); + let rule = FilterPushdown::new_post_optimization(); + let once = rule.optimize(plan, &config).unwrap(); + let twice = rule.optimize(Arc::clone(&once), &config).unwrap(); + + assert_eq!( + get_plan_string(&once), + get_plan_string(&twice), + "second invocation of FilterPushdown::new_post_optimization mutated the plan", + ); +} diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 4ebbf7cb31ccf..067f646a2cdbb 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1640,8 +1640,14 @@ impl ExecutionPlan for HashJoinExec { ChildFilterDescription::all_unsupported(&parent_filters) }; - // Add dynamic filters in Post phase if enabled + // Add dynamic filters in Post phase if enabled. Skip when this join + // already carries a dynamic filter from a previous pass — the shared + // `Arc` is still wired into the probe-side + // scan's predicate, and re-creating it would AND a fresh duplicate + // onto every Post-phase invocation (apache/datafusion-ballista#1359 + // surfaces this in AQE replan loops). if phase == FilterPushdownPhase::Post + && self.dynamic_filter.is_none() && self.allow_join_dynamic_filter_pushdown(config) { // Add actual dynamic filter to right side (probe side)