diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 748b04d5cf718..4691eaf48b0b9 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -15,39 +15,66 @@ // specific language governing permissions and limitations // under the License. -//! [`EliminateOuterJoin`] converts `LEFT/RIGHT/FULL` joins to `INNER` joins +//! [`EliminateOuterJoin`] rewrites outer joins to simpler join types when +//! filters make the outer rows unnecessary (e.g. `LEFT`/`RIGHT` to `INNER`, +//! and `FULL` to `LEFT`/`RIGHT`/`INNER`). +use crate::push_down_filter::replace_cols_by_name; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::{Column, DFSchema, Result}; -use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan}; +use datafusion_common::{Column, DFSchema, Result, qualified_name}; +use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan, Projection}; use datafusion_expr::{Expr, Filter, Operator}; use crate::optimizer::ApplyOrder; use datafusion_common::tree_node::Transformed; use datafusion_expr::expr::{BinaryExpr, Cast, InList, Like, TryCast}; +use std::collections::HashMap; use std::sync::Arc; +/// Attempt to simplify outer joins when filters make their null-padded +/// rows impossible to observe. /// -/// Attempt to replace outer joins with inner joins. +/// Outer joins are generally more expensive than inner joins and can block +/// predicate pushdown and other optimizations. When a filter above an outer +/// join removes every row the join would add for unmatched input rows, the +/// join can be changed to a cheaper join type. /// -/// Outer joins are typically more expensive to compute at runtime -/// than inner joins and prevent various forms of predicate pushdown -/// and other optimizations, so removing them if possible is beneficial. +/// For example: /// -/// Inner joins filter out rows that do match. Outer joins pass rows -/// that do not match padded with nulls. If there is a filter in the -/// query that would filter any such null rows after the join the rows -/// introduced by the outer join are filtered. +/// ```sql +/// SELECT ... +/// FROM a LEFT JOIN b ON ... +/// WHERE b.xx = 100 +/// ``` /// -/// For example, in the `select ... from a left join b on ... where b.xx = 100;` +/// For unmatched rows from `a`, the LEFT JOIN would produce a row with +/// `b.xx` set to NULL. The predicate `b.xx = 100` does not pass for those +/// rows, so the query does not need the LEFT JOIN's null-padded output and +/// the join can be rewritten as an inner join. /// -/// For rows when `b.xx` is null (as it would be after an outer join), -/// the `b.xx = 100` predicate filters them out and there is no -/// need to produce null rows for output. +/// The same reasoning can also simplify FULL joins to LEFT, RIGHT, or INNER +/// joins when filters remove the rows padded on one or both sides. /// -/// Generally, an outer join can be rewritten to inner join if the -/// filters from the WHERE clause return false while any inputs are -/// null and columns of those quals are come from nullable side of -/// outer join. +/// This rule looks for a filter above an outer join: +/// +/// ```text +/// Filter(predicate) +/// Join(LEFT/RIGHT/FULL) +/// ``` +/// +/// It also handles plan shapes where projection pruning has inserted one or +/// more Projection nodes between the filter and join: +/// +/// ```text +/// Filter(predicate over projection output) +/// Projection(...) +/// ... +/// Join(LEFT/RIGHT/FULL) +/// ``` +/// +/// In the projection case, the rule rewrites a copy of the predicate through +/// each Projection so it can analyze the predicate against the Join inputs. +/// The original filter predicate and Projection nodes are preserved when the +/// plan is rebuilt. #[derive(Default, Debug)] pub struct EliminateOuterJoin; @@ -77,61 +104,137 @@ impl OptimizerRule for EliminateOuterJoin { plan: LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result> { - match plan { - LogicalPlan::Filter(mut filter) => match Arc::unwrap_or_clone(filter.input) { + let LogicalPlan::Filter(filter) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Descend through one or more Projection nodes until we find a Join. + // For each Projection we encounter, rewrite a working copy of the + // predicate by replacing references to projection output columns with + // the expressions that define them. Keep the filter's original + // predicate intact for eventual use in the rebuilt plan; the rewritten + // predicate is used only for the null-rejection analysis. + let mut rewritten_predicate = filter.predicate.clone(); + let mut projections: Vec = Vec::new(); + let mut cur = Arc::clone(&filter.input); + + let new_join = loop { + match cur.as_ref() { + LogicalPlan::Projection(p) => { + rewritten_predicate = + inline_through_projection(rewritten_predicate, p)?; + let next = Arc::clone(&p.input); + projections.push(p.clone()); + cur = next; + } LogicalPlan::Join(join) => { - let mut null_rejecting_cols: Vec = vec![]; - - extract_null_rejecting_columns( - &filter.predicate, - &mut null_rejecting_cols, - join.left.schema(), - join.right.schema(), - true, - ); - - let new_join_type = if join.join_type.is_outer() { - let mut left_non_nullable = false; - let mut right_non_nullable = false; - for col in null_rejecting_cols.iter() { - if join.left.schema().has_column(col) { - left_non_nullable = true; - } - if join.right.schema().has_column(col) { - right_non_nullable = true; - } - } - eliminate_outer( - join.join_type, - left_non_nullable, - right_non_nullable, - ) - } else { - join.join_type + let Some(new_join) = try_simplify_join(join, &rewritten_predicate) + else { + return Ok(Transformed::no(LogicalPlan::Filter(filter))); }; - - let new_join = Arc::new(LogicalPlan::Join(Join { - left: join.left, - right: join.right, - join_type: new_join_type, - join_constraint: join.join_constraint, - on: join.on.clone(), - filter: join.filter.clone(), - schema: Arc::clone(&join.schema), - null_equality: join.null_equality, - null_aware: join.null_aware, - })); - Filter::try_new(filter.predicate, new_join) - .map(|f| Transformed::yes(LogicalPlan::Filter(f))) + break new_join; } - filter_input => { - filter.input = Arc::new(filter_input); - Ok(Transformed::no(LogicalPlan::Filter(filter))) + _ => { + return Ok(Transformed::no(LogicalPlan::Filter(filter))); } - }, - _ => Ok(Transformed::no(plan)), + } + }; + + let rebuilt_inner = rewrap_projections(new_join, projections); + Filter::try_new(filter.predicate, Arc::new(rebuilt_inner)) + .map(|f| Transformed::yes(LogicalPlan::Filter(f))) + } +} + +/// Run the null-rejection analysis on `predicate` against `join`'s left/right +/// schemas. Return `Some(new_join_plan)` if the join type can be tightened +/// (e.g. LEFT → INNER), `None` otherwise. +fn try_simplify_join(join: &Join, predicate: &Expr) -> Option { + if !join.join_type.is_outer() { + return None; + } + + let mut null_rejecting_cols: Vec = vec![]; + extract_null_rejecting_columns( + predicate, + &mut null_rejecting_cols, + join.left.schema(), + join.right.schema(), + true, + ); + + let mut left_non_nullable = false; + let mut right_non_nullable = false; + for col in null_rejecting_cols.iter() { + if join.left.schema().has_column(col) { + left_non_nullable = true; + } + if join.right.schema().has_column(col) { + right_non_nullable = true; } } + + let new_join_type = + eliminate_outer(join.join_type, left_non_nullable, right_non_nullable); + if new_join_type == join.join_type { + return None; + } + + Some(LogicalPlan::Join(Join { + left: Arc::clone(&join.left), + right: Arc::clone(&join.right), + join_type: new_join_type, + join_constraint: join.join_constraint, + on: join.on.clone(), + filter: join.filter.clone(), + schema: Arc::clone(&join.schema), + null_equality: join.null_equality, + null_aware: join.null_aware, + })) +} + +/// Substitute the projection's output column references in `predicate` with +/// the projection's defining expressions (stripped of any `Alias` wrapper). +/// The result expresses `predicate` over the projection's *input* schema. +/// +/// Unlike `PushDownFilter`, this rule does not change expression evaluation +/// behavior (in fact, the rewritten expressions are only used for analysis +/// purposes). Therefore, function volatility and `MoveTowardsLeafNodes` +/// placement can be ignored here. +fn inline_through_projection(predicate: Expr, p: &Projection) -> Result { + let mut map: HashMap = HashMap::new(); + for ((qualifier, field), expr) in p.schema.iter().zip(p.expr.iter()) { + map.insert( + qualified_name(qualifier, field.name()), + unalias(expr).clone(), + ); + } + replace_cols_by_name(predicate, &map) +} + +/// Re-attach a stack of projections above `new_inner`, restoring the original +/// plan shape with the new (possibly retyped) join at the bottom. Projection +/// schemas are reused as-is; only nullability of columns sourced from the +/// formerly-outer side may have changed, and the existing rule already takes +/// this looser-schema approach at the join itself. +fn rewrap_projections( + new_inner: LogicalPlan, + projections: Vec, +) -> LogicalPlan { + let mut current = new_inner; + for mut p in projections.into_iter().rev() { + p.input = Arc::new(current); + current = LogicalPlan::Projection(p); + } + current +} + +fn unalias(expr: &Expr) -> &Expr { + if let Expr::Alias(a) = expr { + unalias(&a.expr) + } else { + expr + } } pub fn eliminate_outer( @@ -139,28 +242,14 @@ pub fn eliminate_outer( left_non_nullable: bool, right_non_nullable: bool, ) -> JoinType { - let mut new_join_type = join_type; - match join_type { - JoinType::Left if right_non_nullable => { - new_join_type = JoinType::Inner; - } - JoinType::Left => {} - JoinType::Right if left_non_nullable => { - new_join_type = JoinType::Inner; - } - JoinType::Right => {} - JoinType::Full => { - if left_non_nullable && right_non_nullable { - new_join_type = JoinType::Inner; - } else if left_non_nullable { - new_join_type = JoinType::Left; - } else if right_non_nullable { - new_join_type = JoinType::Right; - } - } - _ => {} + match (join_type, left_non_nullable, right_non_nullable) { + (JoinType::Left, _, true) => JoinType::Inner, + (JoinType::Right, true, _) => JoinType::Inner, + (JoinType::Full, true, true) => JoinType::Inner, + (JoinType::Full, true, false) => JoinType::Left, + (JoinType::Full, false, true) => JoinType::Right, + _ => join_type, } - new_join_type } /// Find the columns that `expr` rejects NULL on. If any of these columns are @@ -1251,6 +1340,97 @@ mod tests { ") } + // ----- Filter pierces a Projection to reach the Join ----- + + #[test] + fn eliminate_left_through_projection() -> Result<()> { + let t1 = test_table_scan_with_name("t1")?; + let t2 = test_table_scan_with_name("t2")?; + + // Filter → Projection → LeftJoin is the shape produced by projection + // pruning in queries such as TPC-DS q49, where the post-join + // Projection sits between the filter and the join. + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Left, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .project(vec![col("t1.a"), col("t2.b").alias("bb")])? + .filter(col("bb").gt(lit(10u32)))? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Filter: bb > UInt32(10) + Projection: t1.a, t2.b AS bb + Inner Join: t1.a = t2.a + TableScan: t1 + TableScan: t2 + ") + } + + #[test] + fn no_eliminate_left_through_projection_with_or_cross_side() -> Result<()> { + let t1 = test_table_scan_with_name("t1")?; + let t2 = test_table_scan_with_name("t2")?; + + // After inlining the filter is still t1.b > 10 OR t2.b < 20, which + // is null-tolerant when t2 is NULL (the t1.b clause can still hold). + // The LEFT JOIN must be preserved. + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Left, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .project(vec![col("t1.b").alias("x"), col("t2.b").alias("y")])? + .filter(binary_expr( + col("x").gt(lit(10u32)), + Or, + col("y").lt(lit(20u32)), + ))? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Filter: x > UInt32(10) OR y < UInt32(20) + Projection: t1.b AS x, t2.b AS y + Left Join: t1.a = t2.a + TableScan: t1 + TableScan: t2 + ") + } + + #[test] + fn no_eliminate_left_through_projection_with_only_left_filter() -> Result<()> { + let t1 = test_table_scan_with_name("t1")?; + let t2 = test_table_scan_with_name("t2")?; + + // A filter that constrains only the preserved (left) side of a + // LEFT JOIN does not justify converting it to INNER — the LEFT + // would still pass nullable right-side rows that the filter + // accepts. + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Left, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .project(vec![col("t1.b").alias("x"), col("t2.b")])? + .filter(col("x").gt(lit(10u32)))? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Filter: x > UInt32(10) + Projection: t1.b AS x, t2.b + Left Join: t1.a = t2.a + TableScan: t1 + TableScan: t2 + ") + } + #[test] fn eliminate_left_with_arithmetic_predicate() -> Result<()> { let t1 = test_table_scan_with_name("t1")?; @@ -1283,7 +1463,6 @@ mod tests { TableScan: t2 ") } - #[test] fn eliminate_left_with_negative_predicate() -> Result<()> { let t1 = test_table_scan_with_name("t1")?; @@ -1368,4 +1547,33 @@ mod tests { TableScan: t2 ") } + + #[test] + fn no_eliminate_through_non_transparent() -> Result<()> { + let t1 = test_table_scan_with_name("t1")?; + let t2 = test_table_scan_with_name("t2")?; + + // Limit is intentionally not treated as transparent: a Limit below + // the Filter changes which rows survive, so swapping LEFT→INNER + // beneath it could yield a different surviving-row set even when + // the filter is null-rejecting on the right side. + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Left, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .limit(0, Some(5))? + .filter(col("t2.b").gt(lit(10u32)))? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Filter: t2.b > UInt32(10) + Limit: skip=0, fetch=5 + Left Join: t1.a = t2.a + TableScan: t1 + TableScan: t2 + ") + } } diff --git a/datafusion/sqllogictest/test_files/eliminate_outer_join.slt b/datafusion/sqllogictest/test_files/eliminate_outer_join.slt index d22a7f2e3ce42..584d8af419d11 100644 --- a/datafusion/sqllogictest/test_files/eliminate_outer_join.slt +++ b/datafusion/sqllogictest/test_files/eliminate_outer_join.slt @@ -538,6 +538,110 @@ select * from t1 left join t2 on t1.a = t2.x where (t2.y > 150) is true and t2.z ---- 2 20 b 2 200 q +### +### Projection between Filter and Join +### + +# A filter on a volatile, projected expression can still be used for outer join +# elimination. +query TT +explain +select s.a +from ( + select t1.a, random() + cast(t2.y as double) as ry + from t1 left join t2 on t1.a = t2.x +) s +where s.ry > 150.0; +---- +logical_plan +01)SubqueryAlias: s +02)--Projection: t1.a +03)----Filter: ry > Float64(150) +04)------Projection: t1.a, random() + CAST(t2.y AS Float64) AS ry +05)--------Inner Join: t1.a = t2.x +06)----------TableScan: t1 projection=[a] +07)----------TableScan: t2 projection=[x, y] + +query I rowsort +select s.a +from ( + select t1.a, random() + cast(t2.y as double) as ry + from t1 left join t2 on t1.a = t2.x +) s +where s.ry > 150.0; +---- +2 + +# This query has the shape of TPC-DS Q49: `OptimizeProjections` results in +# placing a `Projection` node between the `Filter` and `Join`, but we can look +# through that node to convert the outer join. +statement ok +create table d(k int, flag int); + +statement ok +insert into d values (1, 1), (2, 1), (3, 0); + +query TT +explain +select t1.a, sum(coalesce(t2.y, 0)) as ret_sum +from t1 left join t2 on t1.a = t2.x, d +where t2.y > 150 + and t1.a = d.k + and d.flag = 1 +group by t1.a; +---- +logical_plan +01)Projection: t1.a, sum(coalesce(t2.y,Int64(0))) AS ret_sum +02)--Aggregate: groupBy=[[t1.a]], aggr=[[sum(CASE WHEN __common_expr_1 IS NOT NULL THEN __common_expr_1 ELSE Int64(0) END) AS sum(coalesce(t2.y,Int64(0)))]] +03)----Projection: CAST(t2.y AS Int64) AS __common_expr_1, t1.a +04)------Inner Join: t1.a = d.k +05)--------Projection: t1.a, t2.y +06)----------Inner Join: t1.a = t2.x +07)------------TableScan: t1 projection=[a] +08)------------Filter: t2.y > Int32(150) +09)--------------TableScan: t2 projection=[x, y] +10)--------Projection: d.k +11)----------Filter: d.flag = Int32(1) +12)------------TableScan: d projection=[k, flag] + +query II rowsort +select t1.a, sum(coalesce(t2.y, 0)) as ret_sum +from t1 left join t2 on t1.a = t2.x, d +where t2.y > 150 + and t1.a = d.k + and d.flag = 1 +group by t1.a; +---- +2 200 + +# A CTE can introduce a query boundary between the outer filter and the +# LEFT JOIN. +query TT +explain +with s as ( + select t1.a, t2.y + from t1 left join t2 on t1.a = t2.x +) +select s.a from s where s.y > 150; +---- +logical_plan +01)SubqueryAlias: s +02)--Projection: t1.a +03)----Inner Join: t1.a = t2.x +04)------TableScan: t1 projection=[a] +05)------Projection: t2.x +06)--------Filter: t2.y > Int32(150) +07)----------TableScan: t2 projection=[x, y] + +query I rowsort +with s as ( + select t1.a, t2.y + from t1 left join t2 on t1.a = t2.x +) +select s.a from s where s.y > 150; +---- +2 + ### ### Cleanup ### @@ -550,3 +654,6 @@ drop table t1; statement ok drop table t2; + +statement ok +drop table d;