From 40445b21ffc0be421e3349099e45229271d11ce6 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Thu, 21 May 2026 19:29:53 -0400 Subject: [PATCH 1/2] perf: descend through Projection in EliminateOuterJoin EliminateOuterJoin previously only matched the literal Filter -> Join pattern. When a Projection sits between the Filter and the Join, the rule no-ops and the outer join stays in place even when the predicate above the projection would justify converting it. A common shape that hits this comes from projection pruning after filter pushdown. In TPC-DS q49, PushDownFilter moves the returns-side predicate above the sales/returns LEFT JOIN, then OptimizeProjections inserts a pruning Projection between that Filter and the LEFT JOIN. The returns-side predicate still filters out the outer rows, but the projection hides the join from the old rule. Extend the rule to walk down through Projection nodes between Filter and Join, rewriting a working copy of the predicate into the join's coordinate space for analysis. The rewritten predicate is used only for analysis; the original predicate and surrounding plan structure are preserved on success. Tests cover passthrough projection, aliased projection, negative cases, a non-transparent Limit guard, and SQL-level q49-shaped cases where OptimizeProjections places a pruning Projection between a returns-side Filter and the sales/returns LEFT JOIN. --- .../optimizer/src/eliminate_outer_join.rs | 390 ++++++++++++++---- .../test_files/eliminate_outer_join.slt | 107 +++++ 2 files changed, 407 insertions(+), 90 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 748b04d5cf718..180fad7893009 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -15,39 +15,66 @@ // specific language governing permissions and limitations // under the License. -//! [`EliminateOuterJoin`] converts `LEFT/RIGHT/FULL` joins to `INNER` joins +//! [`EliminateOuterJoin`] rewrites outer joins to simpler join types when +//! filters make the outer rows unnecessary (e.g. `LEFT`/`RIGHT` to `INNER`, +//! and `FULL` to `LEFT`/`RIGHT`/`INNER`). +use crate::push_down_filter::replace_cols_by_name; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::{Column, DFSchema, Result}; -use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan}; +use datafusion_common::{Column, DFSchema, Result, qualified_name}; +use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan, Projection}; use datafusion_expr::{Expr, Filter, Operator}; use crate::optimizer::ApplyOrder; use datafusion_common::tree_node::Transformed; use datafusion_expr::expr::{BinaryExpr, Cast, InList, Like, TryCast}; +use std::collections::HashMap; use std::sync::Arc; +/// Attempt to simplify outer joins when filters make their null-padded +/// rows impossible to observe. /// -/// Attempt to replace outer joins with inner joins. +/// Outer joins are generally more expensive than inner joins and can block +/// predicate pushdown and other optimizations. When a filter above an outer +/// join removes every row the join would add for unmatched input rows, the +/// join can be changed to a cheaper join type. /// -/// Outer joins are typically more expensive to compute at runtime -/// than inner joins and prevent various forms of predicate pushdown -/// and other optimizations, so removing them if possible is beneficial. +/// For example: /// -/// Inner joins filter out rows that do match. Outer joins pass rows -/// that do not match padded with nulls. If there is a filter in the -/// query that would filter any such null rows after the join the rows -/// introduced by the outer join are filtered. +/// ```sql +/// SELECT ... +/// FROM a LEFT JOIN b ON ... +/// WHERE b.xx = 100 +/// ``` /// -/// For example, in the `select ... from a left join b on ... where b.xx = 100;` +/// For unmatched rows from `a`, the LEFT JOIN would produce a row with +/// `b.xx` set to NULL. The predicate `b.xx = 100` does not pass for those +/// rows, so the query does not need the LEFT JOIN's null-padded output and +/// the join can be rewritten as an inner join. /// -/// For rows when `b.xx` is null (as it would be after an outer join), -/// the `b.xx = 100` predicate filters them out and there is no -/// need to produce null rows for output. +/// The same reasoning can also simplify FULL joins to LEFT, RIGHT, or INNER +/// joins when filters remove the rows padded on one or both sides. /// -/// Generally, an outer join can be rewritten to inner join if the -/// filters from the WHERE clause return false while any inputs are -/// null and columns of those quals are come from nullable side of -/// outer join. +/// This rule looks for a filter above an outer join: +/// +/// ```text +/// Filter(predicate) +/// Join(LEFT/RIGHT/FULL) +/// ``` +/// +/// It also handles plan shapes where projection pruning has inserted one or +/// more Projection nodes between the filter and join: +/// +/// ```text +/// Filter(predicate over projection output) +/// Projection(...) +/// ... +/// Join(LEFT/RIGHT/FULL) +/// ``` +/// +/// In the projection case, the rule rewrites a copy of the predicate through +/// each Projection so it can analyze the predicate against the Join inputs. +/// The original filter predicate and Projection nodes are preserved when the +/// plan is rebuilt. #[derive(Default, Debug)] pub struct EliminateOuterJoin; @@ -77,61 +104,137 @@ impl OptimizerRule for EliminateOuterJoin { plan: LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result> { - match plan { - LogicalPlan::Filter(mut filter) => match Arc::unwrap_or_clone(filter.input) { + let LogicalPlan::Filter(filter) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Descend through one or more Projection nodes until we find a Join. + // For each Projection we encounter, rewrite a working copy of the + // predicate by replacing references to projection output columns with + // the expressions that define them. Keep the filter's original + // predicate intact for eventual use in the rebuilt plan; the rewritten + // predicate is used only for the null-rejection analysis. + let mut rewritten_predicate = filter.predicate.clone(); + let mut projections: Vec = Vec::new(); + let mut cur = Arc::clone(&filter.input); + + let new_join = loop { + match cur.as_ref() { + LogicalPlan::Projection(p) => { + rewritten_predicate = + inline_through_projection(rewritten_predicate, p)?; + let next = Arc::clone(&p.input); + projections.push(p.clone()); + cur = next; + } LogicalPlan::Join(join) => { - let mut null_rejecting_cols: Vec = vec![]; - - extract_null_rejecting_columns( - &filter.predicate, - &mut null_rejecting_cols, - join.left.schema(), - join.right.schema(), - true, - ); - - let new_join_type = if join.join_type.is_outer() { - let mut left_non_nullable = false; - let mut right_non_nullable = false; - for col in null_rejecting_cols.iter() { - if join.left.schema().has_column(col) { - left_non_nullable = true; - } - if join.right.schema().has_column(col) { - right_non_nullable = true; - } - } - eliminate_outer( - join.join_type, - left_non_nullable, - right_non_nullable, - ) - } else { - join.join_type + let Some(new_join) = try_simplify_join(join, &rewritten_predicate) + else { + return Ok(Transformed::no(LogicalPlan::Filter(filter))); }; - - let new_join = Arc::new(LogicalPlan::Join(Join { - left: join.left, - right: join.right, - join_type: new_join_type, - join_constraint: join.join_constraint, - on: join.on.clone(), - filter: join.filter.clone(), - schema: Arc::clone(&join.schema), - null_equality: join.null_equality, - null_aware: join.null_aware, - })); - Filter::try_new(filter.predicate, new_join) - .map(|f| Transformed::yes(LogicalPlan::Filter(f))) + break new_join; } - filter_input => { - filter.input = Arc::new(filter_input); - Ok(Transformed::no(LogicalPlan::Filter(filter))) + _ => { + return Ok(Transformed::no(LogicalPlan::Filter(filter))); } - }, - _ => Ok(Transformed::no(plan)), + } + }; + + let rebuilt_inner = rewrap_projections(new_join, projections); + Filter::try_new(filter.predicate, Arc::new(rebuilt_inner)) + .map(|f| Transformed::yes(LogicalPlan::Filter(f))) + } +} + +/// Run the null-rejection analysis on `predicate` against `join`'s left/right +/// schemas. Return `Some(new_join_plan)` if the join type can be tightened +/// (e.g. LEFT → INNER), `None` otherwise. +fn try_simplify_join(join: &Join, predicate: &Expr) -> Option { + if !join.join_type.is_outer() { + return None; + } + + let mut null_rejecting_cols: Vec = vec![]; + extract_null_rejecting_columns( + predicate, + &mut null_rejecting_cols, + join.left.schema(), + join.right.schema(), + true, + ); + + let mut left_non_nullable = false; + let mut right_non_nullable = false; + for col in null_rejecting_cols.iter() { + if join.left.schema().has_column(col) { + left_non_nullable = true; + } + if join.right.schema().has_column(col) { + right_non_nullable = true; } } + + let new_join_type = + eliminate_outer(join.join_type, left_non_nullable, right_non_nullable); + if new_join_type == join.join_type { + return None; + } + + Some(LogicalPlan::Join(Join { + left: Arc::clone(&join.left), + right: Arc::clone(&join.right), + join_type: new_join_type, + join_constraint: join.join_constraint, + on: join.on.clone(), + filter: join.filter.clone(), + schema: Arc::clone(&join.schema), + null_equality: join.null_equality, + null_aware: join.null_aware, + })) +} + +/// Substitute the projection's output column references in `predicate` with +/// the projection's defining expressions (stripped of any `Alias` wrapper). +/// The result expresses `predicate` over the projection's *input* schema. +/// +/// Unlike `PushDownFilter`, this rule does not change expression evaluation +/// behavior (in fact, the rewritten expressions are only used for analysis +/// purposes). Therefore, function volatility and `MoveTowardsLeafNodes` +/// placement can be ignored here. +fn inline_through_projection(predicate: Expr, p: &Projection) -> Result { + let mut map: HashMap = HashMap::new(); + for ((qualifier, field), expr) in p.schema.iter().zip(p.expr.iter()) { + map.insert( + qualified_name(qualifier, field.name()), + unalias(expr).clone(), + ); + } + replace_cols_by_name(predicate, &map) +} + +/// Re-attach a stack of projections above `new_inner`, restoring the original +/// plan shape with the new (possibly retyped) join at the bottom. Projection +/// schemas are reused as-is; only nullability of columns sourced from the +/// formerly-outer side may have changed, and the existing rule already takes +/// this looser-schema approach at the join itself. +fn rewrap_projections( + new_inner: LogicalPlan, + projections: Vec, +) -> LogicalPlan { + let mut current = new_inner; + for mut p in projections.into_iter().rev() { + p.input = Arc::new(current); + current = LogicalPlan::Projection(p); + } + current +} + +fn unalias(expr: &Expr) -> &Expr { + if let Expr::Alias(a) = expr { + unalias(&a.expr) + } else { + expr + } } pub fn eliminate_outer( @@ -139,28 +242,14 @@ pub fn eliminate_outer( left_non_nullable: bool, right_non_nullable: bool, ) -> JoinType { - let mut new_join_type = join_type; - match join_type { - JoinType::Left if right_non_nullable => { - new_join_type = JoinType::Inner; - } - JoinType::Left => {} - JoinType::Right if left_non_nullable => { - new_join_type = JoinType::Inner; - } - JoinType::Right => {} - JoinType::Full => { - if left_non_nullable && right_non_nullable { - new_join_type = JoinType::Inner; - } else if left_non_nullable { - new_join_type = JoinType::Left; - } else if right_non_nullable { - new_join_type = JoinType::Right; - } - } - _ => {} + match (join_type, left_non_nullable, right_non_nullable) { + (JoinType::Left, _, true) => JoinType::Inner, + (JoinType::Right, true, _) => JoinType::Inner, + (JoinType::Full, true, true) => JoinType::Inner, + (JoinType::Full, true, false) => JoinType::Left, + (JoinType::Full, false, true) => JoinType::Right, + _ => join_type, } - new_join_type } /// Find the columns that `expr` rejects NULL on. If any of these columns are @@ -1251,6 +1340,99 @@ mod tests { ") } + // ----- Filter pierces a Projection to reach the Join ----- + + #[test] + fn eliminate_left_through_projection() -> Result<()> { + let t1 = test_table_scan_with_name("t1")?; + let t2 = test_table_scan_with_name("t2")?; + + // Filter → Projection → LeftJoin is the shape produced by projection + // pruning in queries such as TPC-DS q49, where the post-join + // Projection sits between the filter and the join. This test also + // introduces an alias `bb` for t2.b, so it exercises inlining the + // filter predicate through the projection. + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Left, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .project(vec![col("t1.a"), col("t2.b").alias("bb")])? + .filter(col("bb").gt(lit(10u32)))? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Filter: bb > UInt32(10) + Projection: t1.a, t2.b AS bb + Inner Join: t1.a = t2.a + TableScan: t1 + TableScan: t2 + ") + } + + #[test] + fn no_eliminate_left_through_projection_with_or_cross_side() -> Result<()> { + let t1 = test_table_scan_with_name("t1")?; + let t2 = test_table_scan_with_name("t2")?; + + // After inlining the filter is still t1.b > 10 OR t2.b < 20, which + // is null-tolerant when t2 is NULL (the t1.b clause can still hold). + // The LEFT JOIN must be preserved. + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Left, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .project(vec![col("t1.b").alias("x"), col("t2.b").alias("y")])? + .filter(binary_expr( + col("x").gt(lit(10u32)), + Or, + col("y").lt(lit(20u32)), + ))? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Filter: x > UInt32(10) OR y < UInt32(20) + Projection: t1.b AS x, t2.b AS y + Left Join: t1.a = t2.a + TableScan: t1 + TableScan: t2 + ") + } + + #[test] + fn no_eliminate_left_through_projection_with_only_left_filter() -> Result<()> { + let t1 = test_table_scan_with_name("t1")?; + let t2 = test_table_scan_with_name("t2")?; + + // A filter that constrains only the preserved (left) side of a + // LEFT JOIN does not justify converting it to INNER — the LEFT + // would still pass nullable right-side rows that the filter + // accepts. + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Left, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .project(vec![col("t1.b").alias("x"), col("t2.b")])? + .filter(col("x").gt(lit(10u32)))? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Filter: x > UInt32(10) + Projection: t1.b AS x, t2.b + Left Join: t1.a = t2.a + TableScan: t1 + TableScan: t2 + ") + } + #[test] fn eliminate_left_with_arithmetic_predicate() -> Result<()> { let t1 = test_table_scan_with_name("t1")?; @@ -1283,7 +1465,6 @@ mod tests { TableScan: t2 ") } - #[test] fn eliminate_left_with_negative_predicate() -> Result<()> { let t1 = test_table_scan_with_name("t1")?; @@ -1368,4 +1549,33 @@ mod tests { TableScan: t2 ") } + + #[test] + fn no_eliminate_through_non_transparent() -> Result<()> { + let t1 = test_table_scan_with_name("t1")?; + let t2 = test_table_scan_with_name("t2")?; + + // Limit is intentionally not treated as transparent: a Limit below + // the Filter changes which rows survive, so swapping LEFT→INNER + // beneath it could yield a different surviving-row set even when + // the filter is null-rejecting on the right side. + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Left, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .limit(0, Some(5))? + .filter(col("t2.b").gt(lit(10u32)))? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Filter: t2.b > UInt32(10) + Limit: skip=0, fetch=5 + Left Join: t1.a = t2.a + TableScan: t1 + TableScan: t2 + ") + } } diff --git a/datafusion/sqllogictest/test_files/eliminate_outer_join.slt b/datafusion/sqllogictest/test_files/eliminate_outer_join.slt index d22a7f2e3ce42..584d8af419d11 100644 --- a/datafusion/sqllogictest/test_files/eliminate_outer_join.slt +++ b/datafusion/sqllogictest/test_files/eliminate_outer_join.slt @@ -538,6 +538,110 @@ select * from t1 left join t2 on t1.a = t2.x where (t2.y > 150) is true and t2.z ---- 2 20 b 2 200 q +### +### Projection between Filter and Join +### + +# A filter on a volatile, projected expression can still be used for outer join +# elimination. +query TT +explain +select s.a +from ( + select t1.a, random() + cast(t2.y as double) as ry + from t1 left join t2 on t1.a = t2.x +) s +where s.ry > 150.0; +---- +logical_plan +01)SubqueryAlias: s +02)--Projection: t1.a +03)----Filter: ry > Float64(150) +04)------Projection: t1.a, random() + CAST(t2.y AS Float64) AS ry +05)--------Inner Join: t1.a = t2.x +06)----------TableScan: t1 projection=[a] +07)----------TableScan: t2 projection=[x, y] + +query I rowsort +select s.a +from ( + select t1.a, random() + cast(t2.y as double) as ry + from t1 left join t2 on t1.a = t2.x +) s +where s.ry > 150.0; +---- +2 + +# This query has the shape of TPC-DS Q49: `OptimizeProjections` results in +# placing a `Projection` node between the `Filter` and `Join`, but we can look +# through that node to convert the outer join. +statement ok +create table d(k int, flag int); + +statement ok +insert into d values (1, 1), (2, 1), (3, 0); + +query TT +explain +select t1.a, sum(coalesce(t2.y, 0)) as ret_sum +from t1 left join t2 on t1.a = t2.x, d +where t2.y > 150 + and t1.a = d.k + and d.flag = 1 +group by t1.a; +---- +logical_plan +01)Projection: t1.a, sum(coalesce(t2.y,Int64(0))) AS ret_sum +02)--Aggregate: groupBy=[[t1.a]], aggr=[[sum(CASE WHEN __common_expr_1 IS NOT NULL THEN __common_expr_1 ELSE Int64(0) END) AS sum(coalesce(t2.y,Int64(0)))]] +03)----Projection: CAST(t2.y AS Int64) AS __common_expr_1, t1.a +04)------Inner Join: t1.a = d.k +05)--------Projection: t1.a, t2.y +06)----------Inner Join: t1.a = t2.x +07)------------TableScan: t1 projection=[a] +08)------------Filter: t2.y > Int32(150) +09)--------------TableScan: t2 projection=[x, y] +10)--------Projection: d.k +11)----------Filter: d.flag = Int32(1) +12)------------TableScan: d projection=[k, flag] + +query II rowsort +select t1.a, sum(coalesce(t2.y, 0)) as ret_sum +from t1 left join t2 on t1.a = t2.x, d +where t2.y > 150 + and t1.a = d.k + and d.flag = 1 +group by t1.a; +---- +2 200 + +# A CTE can introduce a query boundary between the outer filter and the +# LEFT JOIN. +query TT +explain +with s as ( + select t1.a, t2.y + from t1 left join t2 on t1.a = t2.x +) +select s.a from s where s.y > 150; +---- +logical_plan +01)SubqueryAlias: s +02)--Projection: t1.a +03)----Inner Join: t1.a = t2.x +04)------TableScan: t1 projection=[a] +05)------Projection: t2.x +06)--------Filter: t2.y > Int32(150) +07)----------TableScan: t2 projection=[x, y] + +query I rowsort +with s as ( + select t1.a, t2.y + from t1 left join t2 on t1.a = t2.x +) +select s.a from s where s.y > 150; +---- +2 + ### ### Cleanup ### @@ -550,3 +654,6 @@ drop table t1; statement ok drop table t2; + +statement ok +drop table d; From fc7f1659d1567b80dafcd206ec280f45e877b9a2 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Tue, 26 May 2026 12:03:41 -0400 Subject: [PATCH 2/2] Tweak comment --- datafusion/optimizer/src/eliminate_outer_join.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 180fad7893009..4691eaf48b0b9 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -1349,9 +1349,7 @@ mod tests { // Filter → Projection → LeftJoin is the shape produced by projection // pruning in queries such as TPC-DS q49, where the post-join - // Projection sits between the filter and the join. This test also - // introduces an alias `bb` for t2.b, so it exercises inlining the - // filter predicate through the projection. + // Projection sits between the filter and the join. let plan = LogicalPlanBuilder::from(t1) .join( t2,