From 8416100b679388aeac86a2bbf98264a8e1278962 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Tue, 26 May 2026 15:59:43 +0200 Subject: [PATCH 1/5] Gate new ScalarSubqueryExec node behind session property --- datafusion/common/src/config.rs | 7 + datafusion/core/src/physical_planner.rs | 15 ++- .../optimizer/src/scalar_subquery_to_join.rs | 120 +++++++++++++++--- docs/source/user-guide/configs.md | 1 + 4 files changed, 121 insertions(+), 22 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e6d1ebbbbe746..9f8e41fa6d19a 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1124,6 +1124,13 @@ config_namespace! { /// into the file scan phase. pub enable_topk_dynamic_filter_pushdown: bool, default = true + /// When set to true, uncorrelated scalar subqueries are left in + /// the logical plan and executed by `ScalarSubqueryExec` during physical + /// execution. When set to false, all scalar subqueries (including + /// uncorrelated ones) are rewritten to left joins by the + /// `ScalarSubqueryToJoin` optimizer rule. + pub physical_uncorrelated_scalar_subquery: bool, default = true + /// When set to true, the optimizer will attempt to push down Join dynamic filters /// into the file scan phase. pub enable_join_dynamic_filter_pushdown: bool, default = true diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index ee97309c27aae..4ebd25aa47b7d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -437,7 +437,20 @@ impl DefaultPhysicalPlanner { session_state: &'a SessionState, ) -> futures::future::BoxFuture<'a, Result>> { Box::pin(async move { - let all_subqueries = Self::collect_scalar_subqueries(logical_plan); + // When `physical_uncorrelated_scalar_subquery` is disabled, the + // `ScalarSubqueryToJoin` optimizer rule rewrites all uncorrelated + // scalar subqueries to joins, so none should reach this point. + // Skip collection in that case to avoid creating a no-op + // `ScalarSubqueryExec` wrapper. + let all_subqueries = if session_state + .config_options() + .optimizer + .physical_uncorrelated_scalar_subquery + { + Self::collect_scalar_subqueries(logical_plan) + } else { + Vec::new() + }; let (links, index_map) = self .plan_scalar_subqueries(all_subqueries, session_state) .await?; diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index fee430047ab7c..c4a225a472575 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -63,10 +63,12 @@ impl ScalarSubqueryToJoin { &self, predicate: &Expr, alias_gen: &Arc, + physical_uncorrelated: bool, ) -> Result<(Vec<(Subquery, String)>, Expr)> { let mut extract = ExtractScalarSubQuery { sub_query_info: vec![], alias_gen, + physical_uncorrelated, }; predicate .clone() @@ -88,15 +90,23 @@ impl OptimizerRule for ScalarSubqueryToJoin { ) -> Result> { match plan { LogicalPlan::Filter(filter) => { + let physical_uncorrelated = config + .options() + .optimizer + .physical_uncorrelated_scalar_subquery; // Optimization: skip the rest of the rule and its copies if - // there are no scalar subqueries - if !contains_correlated_scalar_subquery(&filter.predicate) { + // there are no scalar subqueries this rule should rewrite + if !contains_scalar_subquery_to_rewrite( + &filter.predicate, + physical_uncorrelated, + ) { return Ok(Transformed::no(LogicalPlan::Filter(filter))); } let (subqueries, mut rewrite_expr) = self.extract_subquery_exprs( &filter.predicate, config.alias_generator(), + physical_uncorrelated, )?; assert_or_internal_err!( @@ -141,13 +151,15 @@ impl OptimizerRule for ScalarSubqueryToJoin { Ok(Transformed::yes(new_plan)) } LogicalPlan::Projection(projection) => { + let physical_uncorrelated = config + .options() + .optimizer + .physical_uncorrelated_scalar_subquery; // Optimization: skip the rest of the rule and its copies if there - // are no correlated scalar subqueries - if !projection - .expr - .iter() - .any(contains_correlated_scalar_subquery) - { + // are no scalar subqueries this rule should rewrite + if !projection.expr.iter().any(|expr| { + contains_scalar_subquery_to_rewrite(expr, physical_uncorrelated) + }) { return Ok(Transformed::no(LogicalPlan::Projection(projection))); } @@ -156,8 +168,11 @@ impl OptimizerRule for ScalarSubqueryToJoin { let mut rewrite_exprs: Vec = Vec::with_capacity(projection.expr.len()); for (idx, expr) in projection.expr.iter().enumerate() { - let (subqueries, rewrite_expr) = - self.extract_subquery_exprs(expr, config.alias_generator())?; + let (subqueries, rewrite_expr) = self.extract_subquery_exprs( + expr, + config.alias_generator(), + physical_uncorrelated, + )?; for (_, alias) in &subqueries { alias_to_index.insert(alias.clone(), idx); } @@ -228,12 +243,20 @@ impl OptimizerRule for ScalarSubqueryToJoin { } } -/// Returns true if the expression contains a correlated scalar subquery, false -/// otherwise. Uncorrelated scalar subqueries are handled by the physical -/// planner via `ScalarSubqueryExec` and do not need to be converted to joins. -fn contains_correlated_scalar_subquery(expr: &Expr) -> bool { +/// Returns true if the expression contains a scalar subquery that this rule +/// should rewrite to a join. +/// +/// When `physical_uncorrelated_scalar_subquery` is true (the default) only +/// correlated scalar subqueries are rewritten — uncorrelated ones are handled +/// by the physical planner via `ScalarSubqueryExec`. When it is false, all +/// scalar subqueries (correlated and uncorrelated) are rewritten. +fn contains_scalar_subquery_to_rewrite(expr: &Expr, physical_uncorrelated: bool) -> bool { expr.exists(|expr| { - Ok(matches!(expr, Expr::ScalarSubquery(sq) if !sq.outer_ref_columns.is_empty())) + Ok(matches!( + expr, + Expr::ScalarSubquery(sq) + if !physical_uncorrelated || !sq.outer_ref_columns.is_empty() + )) }) .expect("Inner is always Ok") } @@ -241,6 +264,7 @@ fn contains_correlated_scalar_subquery(expr: &Expr) -> bool { struct ExtractScalarSubQuery<'a> { sub_query_info: Vec<(Subquery, String)>, alias_gen: &'a Arc, + physical_uncorrelated: bool, } impl TreeNodeRewriter for ExtractScalarSubQuery<'_> { @@ -248,9 +272,13 @@ impl TreeNodeRewriter for ExtractScalarSubQuery<'_> { fn f_down(&mut self, expr: Expr) -> Result> { match expr { - // Skip uncorrelated scalar subqueries + // Match scalar subqueries this rule should rewrite to a join. When + // `physical_uncorrelated` is true, only correlated subqueries are + // rewritten — uncorrelated ones are handled later by the physical + // planner. When false, both are rewritten. Expr::ScalarSubquery(ref subquery) - if !subquery.outer_ref_columns.is_empty() => + if !self.physical_uncorrelated + || !subquery.outer_ref_columns.is_empty() => { let subquery = subquery.clone(); let scalar_expr = subquery @@ -308,10 +336,9 @@ fn build_join( outer_input: &LogicalPlan, subquery_alias: &str, ) -> Result)>> { - assert_or_internal_err!( - !subquery.outer_ref_columns.is_empty(), - "build_join should only be called for correlated subqueries" - ); + // `build_join` also handles uncorrelated scalar subqueries (as a left + // join with `Boolean(true)`) when the + // `physical_uncorrelated_scalar_subquery` option is disabled. let subquery_plan = subquery.subquery.as_ref(); let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true); let decorrelated_subquery = subquery_plan.clone().rewrite(&mut pull_up).data()?; @@ -1159,4 +1186,55 @@ mod tests { " ) } + + /// When `physical_uncorrelated_scalar_subquery` is disabled, uncorrelated + /// scalar subqueries are rewritten to left joins by this rule (the + /// pre-DataFusion 54 behavior), instead of being left in the plan for + /// `ScalarSubqueryExec` to execute. + #[test] + fn uncorrelated_scalar_subquery_rewritten_when_flag_off() -> Result<()> { + use datafusion_common::config::ConfigOptions; + + let sq = Arc::new( + LogicalPlanBuilder::from(scan_tpch_table("orders")) + .aggregate(Vec::::new(), vec![max(col("orders.o_custkey"))])? + .project(vec![max(col("orders.o_custkey"))])? + .build()?, + ); + + let plan = LogicalPlanBuilder::from(scan_tpch_table("customer")) + .filter(col("customer.c_custkey").eq(scalar_subquery(sq)))? + .project(vec![col("customer.c_custkey")])? + .build()?; + + let mut options = ConfigOptions::default(); + options.optimizer.filter_null_join_keys = true; + options.optimizer.physical_uncorrelated_scalar_subquery = false; + let context = crate::OptimizerContext::new_with_config_options(Arc::new(options)); + + let rule: Arc = + Arc::new(ScalarSubqueryToJoin::new()); + let optimizer = crate::Optimizer::with_rules(vec![rule]); + let optimized_plan = optimizer + .optimize(plan, &context, |_, _| {}) + .expect("failed to optimize plan"); + let formatted_plan = optimized_plan.display_indent_schema(); + + insta::assert_snapshot!( + formatted_plan, + @r" + Projection: customer.c_custkey [c_custkey:Int64] + Projection: customer.c_custkey, customer.c_name [c_custkey:Int64, c_name:Utf8] + Filter: customer.c_custkey = __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N] + Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N] + TableScan: customer [c_custkey:Int64, c_name:Utf8] + SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N] + Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N] + Aggregate: groupBy=[[]], aggr=[[max(orders.o_custkey)]] [max(orders.o_custkey):Int64;N] + TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] + " + ); + + Ok(()) + } } diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 576137bda29d1..53ebb72dc9f06 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -144,6 +144,7 @@ The following configuration settings are available: | datafusion.optimizer.enable_window_topn | false | When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. When the window partition key has low cardinality, enabling this optimization can improve performance. However, for high cardinality keys, it may cause regressions in both memory usage and runtime. | | datafusion.optimizer.enable_topk_repartition | true | When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. | | datafusion.optimizer.enable_topk_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. | +| datafusion.optimizer.physical_uncorrelated_scalar_subquery | true | When set to true, uncorrelated scalar subqueries are left in the logical plan and executed by `ScalarSubqueryExec` during physical execution. When set to false, all scalar subqueries (including uncorrelated ones) are rewritten to left joins by the `ScalarSubqueryToJoin` optimizer rule. | | datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | | datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. | | datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | From ec132965e14662bcda745bc9b613c6faa24dad4a Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Tue, 26 May 2026 16:44:16 +0200 Subject: [PATCH 2/5] Fix expected results in information_schema.slt --- datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b0c7e3f8fe643..ec17b1b256640 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -320,6 +320,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 datafusion.optimizer.join_reordering true datafusion.optimizer.max_passes 3 +datafusion.optimizer.physical_uncorrelated_scalar_subquery true datafusion.optimizer.prefer_existing_sort false datafusion.optimizer.prefer_existing_union false datafusion.optimizer.prefer_hash_join true @@ -470,6 +471,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum es datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition datafusion.optimizer.join_reordering true When set to true, the physical plan optimizer may swap join inputs based on statistics. When set to false, statistics-driven join input reordering is disabled and the original join order in the query is used. datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan +datafusion.optimizer.physical_uncorrelated_scalar_subquery true When set to true, uncorrelated scalar subqueries are left in the logical plan and executed by `ScalarSubqueryExec` during physical execution. When set to false, all scalar subqueries (including uncorrelated ones) are rewritten to left joins by the `ScalarSubqueryToJoin` optimizer rule. datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory From ddc20cdbca226cdb7807cae80c3fcf30fc17c42f Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Tue, 26 May 2026 16:53:16 +0200 Subject: [PATCH 3/5] Add test cases in subquery.slt with the flag set to false --- .../sqllogictest/test_files/subquery.slt | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 25f124f217cbf..ff39528c4a73b 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -2091,6 +2091,95 @@ SELECT (SELECT v FROM (SELECT 1 AS v UNION ALL SELECT 2) AS t ORDER BY v LIMIT 1 ---- 1 +############# +## End-to-end correctness coverage for the flag-off path. +## When `datafusion.optimizer.physical_uncorrelated_scalar_subquery` is false, +## uncorrelated scalar subqueries are rewritten to left joins by +## `ScalarSubqueryToJoin` instead of executed by `ScalarSubqueryExec`. This +## restores pre-PR-21240 behavior, which has three known shortcomings the +## physical-execution path was built to fix: multi-row subqueries silently +## return wrong results, and uncorrelated scalar subqueries do not work in +## ORDER BY / JOIN ON / aggregate-function arguments. Those cases are +## intentionally not covered here; the queries below are the ones where both +## paths agree. +############# + +statement ok +set datafusion.optimizer.physical_uncorrelated_scalar_subquery = false; + +# Scalar subquery returning exactly one row → success +query I +SELECT (SELECT v FROM sq_values LIMIT 1); +---- +1 + +# Scalar subquery returning exactly one row in WHERE → success +query I rowsort +SELECT x FROM sq_main WHERE x > (SELECT v FROM sq_values LIMIT 1); +---- +10 +20 + +# Scalar subquery returning zero rows → NULL +query I +SELECT (SELECT v FROM sq_empty); +---- +NULL + +# Scalar subquery returning zero rows in arithmetic → NULL propagation +query I +SELECT x + (SELECT v FROM sq_empty) FROM sq_main; +---- +NULL +NULL + +# Scalar subquery returning zero rows in WHERE comparison → no matching rows +query I +SELECT x FROM sq_main WHERE x > (SELECT v FROM sq_empty); +---- + +# Aggregated subquery always returns one row, even on empty input → success +query I +SELECT (SELECT count(*) FROM sq_empty); +---- +0 + +# Aggregated subquery on multi-row table → success +query I +SELECT (SELECT max(v) FROM sq_values); +---- +3 + +# HAVING clause with uncorrelated scalar subquery +query II rowsort +SELECT x, count(*) AS cnt FROM sq_main GROUP BY x +HAVING count(*) > (SELECT min(v) FROM sq_values); +---- + +# CASE WHEN with uncorrelated scalar subquery as condition +query T rowsort +SELECT CASE WHEN x > (SELECT min(v) FROM sq_values) + THEN 'big' ELSE 'small' END AS label +FROM sq_main; +---- +big +big + +# Doubly-nested constant subquery +query I +SELECT (SELECT (SELECT 42)); +---- +42 + +# NULL comparison semantics through subquery boundary +query B +SELECT 1 = (SELECT CAST(NULL AS INT)); +---- +NULL + +statement ok +RESET datafusion.optimizer.physical_uncorrelated_scalar_subquery; + statement count 0 DROP TABLE sq_values; From d1b9dade876c2df42cbaf5119b8136cfa8c49943 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Tue, 26 May 2026 17:50:41 +0200 Subject: [PATCH 4/5] test tpch with flag turned off --- datafusion/optimizer/src/scalar_subquery_to_join.rs | 4 ---- datafusion/sqllogictest/test_files/tpch/tpch.slt | 9 +++++++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index c4a225a472575..8fb3a38bdc070 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -1187,10 +1187,6 @@ mod tests { ) } - /// When `physical_uncorrelated_scalar_subquery` is disabled, uncorrelated - /// scalar subqueries are rewritten to left joins by this rule (the - /// pre-DataFusion 54 behavior), instead of being left in the plan for - /// `ScalarSubqueryExec` to execute. #[test] fn uncorrelated_scalar_subquery_rewritten_when_flag_off() -> Result<()> { use datafusion_common::config::ConfigOptions; diff --git a/datafusion/sqllogictest/test_files/tpch/tpch.slt b/datafusion/sqllogictest/test_files/tpch/tpch.slt index 764285784aa50..ecf339c466242 100644 --- a/datafusion/sqllogictest/test_files/tpch/tpch.slt +++ b/datafusion/sqllogictest/test_files/tpch/tpch.slt @@ -21,6 +21,15 @@ include ./create_tables.slt.part include ./plans/q*.slt.part include ./answers/q*.slt.part +# test answers with uncorrelated scalar subqueries rewritten to joins +statement ok +set datafusion.optimizer.physical_uncorrelated_scalar_subquery = false; + +include ./answers/q*.slt.part + +statement ok +reset datafusion.optimizer.physical_uncorrelated_scalar_subquery; + # test answers with sort merge join statement ok set datafusion.optimizer.prefer_hash_join = false; From bcc0d7831a687b5c51bf9c5a93404718cd58346b Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Wed, 27 May 2026 11:24:56 +0200 Subject: [PATCH 5/5] Adress code review --- datafusion/common/src/config.rs | 18 +++++++--- datafusion/core/src/physical_planner.rs | 4 +-- .../optimizer/src/scalar_subquery_to_join.rs | 33 +++++++++++++------ .../test_files/information_schema.slt | 4 +-- .../sqllogictest/test_files/subquery.slt | 8 ++--- .../sqllogictest/test_files/tpch/tpch.slt | 4 +-- docs/source/user-guide/configs.md | 2 +- 7 files changed, 47 insertions(+), 26 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9f8e41fa6d19a..0d560b7161856 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1124,12 +1124,20 @@ config_namespace! { /// into the file scan phase. pub enable_topk_dynamic_filter_pushdown: bool, default = true - /// When set to true, uncorrelated scalar subqueries are left in - /// the logical plan and executed by `ScalarSubqueryExec` during physical - /// execution. When set to false, all scalar subqueries (including - /// uncorrelated ones) are rewritten to left joins by the + /// When set to true, uncorrelated scalar subqueries are + /// left in the logical plan and executed by `ScalarSubqueryExec` during + /// physical execution. When set to false, all scalar subqueries + /// (including uncorrelated ones) are rewritten to left joins by the /// `ScalarSubqueryToJoin` optimizer rule. - pub physical_uncorrelated_scalar_subquery: bool, default = true + /// + /// Note disabling this option is not recommended. It restores + /// pre-PR-21240 behavior, which silently produces incorrect results for + /// multi-row subqueries and does not support scalar subqueries in + /// ORDER BY / JOIN ON / aggregate-function arguments. This option is + /// intended as a temporary escape hatch for distributed execution + /// frameworks and is planned to be removed in a future DataFusion + /// release. + pub enable_physical_uncorrelated_scalar_subquery: bool, default = true /// When set to true, the optimizer will attempt to push down Join dynamic filters /// into the file scan phase. diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 4ebd25aa47b7d..43af743caa030 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -437,7 +437,7 @@ impl DefaultPhysicalPlanner { session_state: &'a SessionState, ) -> futures::future::BoxFuture<'a, Result>> { Box::pin(async move { - // When `physical_uncorrelated_scalar_subquery` is disabled, the + // When `enable_physical_uncorrelated_scalar_subquery` is disabled, the // `ScalarSubqueryToJoin` optimizer rule rewrites all uncorrelated // scalar subqueries to joins, so none should reach this point. // Skip collection in that case to avoid creating a no-op @@ -445,7 +445,7 @@ impl DefaultPhysicalPlanner { let all_subqueries = if session_state .config_options() .optimizer - .physical_uncorrelated_scalar_subquery + .enable_physical_uncorrelated_scalar_subquery { Self::collect_scalar_subqueries(logical_plan) } else { diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 8fb3a38bdc070..ff71556983b5a 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`ScalarSubqueryToJoin`] rewriting correlated scalar subquery filters to `JOIN`s +//! [`ScalarSubqueryToJoin`] rewriting scalar subquery filters to `JOIN`s use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; @@ -36,9 +36,14 @@ use datafusion_expr::logical_plan::{JoinType, Subquery}; use datafusion_expr::utils::conjunction; use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, lit, not, when}; -/// Optimizer rule that rewrites correlated scalar subquery filters to joins and -/// places an additional projection on top of the filter, to preserve the -/// original schema. +/// Optimizer rule that rewrites scalar subquery filters to joins and places an +/// additional projection on top of the filter to preserve the original schema. +/// +/// When [`datafusion_common::config::OptimizerOptions::enable_physical_uncorrelated_scalar_subquery`] is +/// true (the default), only *correlated* scalar subqueries are rewritten here; +/// uncorrelated ones are left for physical execution via `ScalarSubqueryExec`. +/// When the option is false, all scalar subqueries — correlated and +/// uncorrelated — are rewritten to left joins by this rule. #[derive(Default, Debug)] pub struct ScalarSubqueryToJoin {} @@ -93,7 +98,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { let physical_uncorrelated = config .options() .optimizer - .physical_uncorrelated_scalar_subquery; + .enable_physical_uncorrelated_scalar_subquery; // Optimization: skip the rest of the rule and its copies if // there are no scalar subqueries this rule should rewrite if !contains_scalar_subquery_to_rewrite( @@ -154,7 +159,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { let physical_uncorrelated = config .options() .optimizer - .physical_uncorrelated_scalar_subquery; + .enable_physical_uncorrelated_scalar_subquery; // Optimization: skip the rest of the rule and its copies if there // are no scalar subqueries this rule should rewrite if !projection.expr.iter().any(|expr| { @@ -246,7 +251,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { /// Returns true if the expression contains a scalar subquery that this rule /// should rewrite to a join. /// -/// When `physical_uncorrelated_scalar_subquery` is true (the default) only +/// When `enable_physical_uncorrelated_scalar_subquery` is true (the default) only /// correlated scalar subqueries are rewritten — uncorrelated ones are handled /// by the physical planner via `ScalarSubqueryExec`. When it is false, all /// scalar subqueries (correlated and uncorrelated) are rewritten. @@ -316,9 +321,15 @@ impl TreeNodeRewriter for ExtractScalarSubQuery<'_> { /// where c.balance > o."avg(total)" /// ``` /// +/// When [`datafusion_common::config::OptimizerOptions::enable_physical_uncorrelated_scalar_subquery`] is +/// false, this function also handles uncorrelated scalar subqueries, rewriting +/// them as a `Left Join: Filter: Boolean(true)` instead of leaving them for +/// `ScalarSubqueryExec`. +/// /// # Arguments /// -/// * `subquery` - The correlated scalar subquery to decorrelate. +/// * `subquery` - The scalar subquery to rewrite (correlated, or uncorrelated +/// when `enable_physical_uncorrelated_scalar_subquery` is false). /// * `outer_input` - The outer plan that the decorrelated subquery is /// left-joined onto — the input of the `Filter` or `Projection` node /// that contained the subquery. @@ -338,7 +349,7 @@ fn build_join( ) -> Result)>> { // `build_join` also handles uncorrelated scalar subqueries (as a left // join with `Boolean(true)`) when the - // `physical_uncorrelated_scalar_subquery` option is disabled. + // `enable_physical_uncorrelated_scalar_subquery` option is disabled. let subquery_plan = subquery.subquery.as_ref(); let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true); let decorrelated_subquery = subquery_plan.clone().rewrite(&mut pull_up).data()?; @@ -1205,7 +1216,9 @@ mod tests { let mut options = ConfigOptions::default(); options.optimizer.filter_null_join_keys = true; - options.optimizer.physical_uncorrelated_scalar_subquery = false; + options + .optimizer + .enable_physical_uncorrelated_scalar_subquery = false; let context = crate::OptimizerContext::new_with_config_options(Arc::new(options)); let rule: Arc = diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index ec17b1b256640..ff517700822b5 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -303,6 +303,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true datafusion.optimizer.enable_join_dynamic_filter_pushdown true datafusion.optimizer.enable_leaf_expression_pushdown true +datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery true datafusion.optimizer.enable_piecewise_merge_join false datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_sort_pushdown true @@ -320,7 +321,6 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 datafusion.optimizer.join_reordering true datafusion.optimizer.max_passes 3 -datafusion.optimizer.physical_uncorrelated_scalar_subquery true datafusion.optimizer.prefer_existing_sort false datafusion.optimizer.prefer_existing_union false datafusion.optimizer.prefer_hash_join true @@ -454,6 +454,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to tru datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. +datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery true When set to true, uncorrelated scalar subqueries are left in the logical plan and executed by `ScalarSubqueryExec` during physical execution. When set to false, all scalar subqueries (including uncorrelated ones) are rewritten to left joins by the `ScalarSubqueryToJoin` optimizer rule. Note disabling this option is not recommended. It restores pre-PR-21240 behavior, which silently produces incorrect results for multi-row subqueries and does not support scalar subqueries in ORDER BY / JOIN ON / aggregate-function arguments. This option is intended as a temporary escape hatch for distributed execution frameworks and is planned to be removed in a future DataFusion release. datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true @@ -471,7 +472,6 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum es datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition datafusion.optimizer.join_reordering true When set to true, the physical plan optimizer may swap join inputs based on statistics. When set to false, statistics-driven join input reordering is disabled and the original join order in the query is used. datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan -datafusion.optimizer.physical_uncorrelated_scalar_subquery true When set to true, uncorrelated scalar subqueries are left in the logical plan and executed by `ScalarSubqueryExec` during physical execution. When set to false, all scalar subqueries (including uncorrelated ones) are rewritten to left joins by the `ScalarSubqueryToJoin` optimizer rule. datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index ff39528c4a73b..f908348e47704 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -2093,10 +2093,10 @@ SELECT (SELECT v FROM (SELECT 1 AS v UNION ALL SELECT 2) AS t ORDER BY v LIMIT 1 ############# ## End-to-end correctness coverage for the flag-off path. -## When `datafusion.optimizer.physical_uncorrelated_scalar_subquery` is false, +## When `datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery` is false, ## uncorrelated scalar subqueries are rewritten to left joins by ## `ScalarSubqueryToJoin` instead of executed by `ScalarSubqueryExec`. This -## restores pre-PR-21240 behavior, which has three known shortcomings the +## restores pre-PR-21240 behavior, which has two known shortcomings the ## physical-execution path was built to fix: multi-row subqueries silently ## return wrong results, and uncorrelated scalar subqueries do not work in ## ORDER BY / JOIN ON / aggregate-function arguments. Those cases are @@ -2105,7 +2105,7 @@ SELECT (SELECT v FROM (SELECT 1 AS v UNION ALL SELECT 2) AS t ORDER BY v LIMIT 1 ############# statement ok -set datafusion.optimizer.physical_uncorrelated_scalar_subquery = false; +set datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery = false; # Scalar subquery returning exactly one row → success query I @@ -2178,7 +2178,7 @@ SELECT 1 = (SELECT CAST(NULL AS INT)); NULL statement ok -RESET datafusion.optimizer.physical_uncorrelated_scalar_subquery; +RESET datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery; statement count 0 DROP TABLE sq_values; diff --git a/datafusion/sqllogictest/test_files/tpch/tpch.slt b/datafusion/sqllogictest/test_files/tpch/tpch.slt index ecf339c466242..b893ff61cd1b7 100644 --- a/datafusion/sqllogictest/test_files/tpch/tpch.slt +++ b/datafusion/sqllogictest/test_files/tpch/tpch.slt @@ -23,12 +23,12 @@ include ./answers/q*.slt.part # test answers with uncorrelated scalar subqueries rewritten to joins statement ok -set datafusion.optimizer.physical_uncorrelated_scalar_subquery = false; +set datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery = false; include ./answers/q*.slt.part statement ok -reset datafusion.optimizer.physical_uncorrelated_scalar_subquery; +reset datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery; # test answers with sort merge join statement ok diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 53ebb72dc9f06..977afce273030 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -144,7 +144,7 @@ The following configuration settings are available: | datafusion.optimizer.enable_window_topn | false | When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. When the window partition key has low cardinality, enabling this optimization can improve performance. However, for high cardinality keys, it may cause regressions in both memory usage and runtime. | | datafusion.optimizer.enable_topk_repartition | true | When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. | | datafusion.optimizer.enable_topk_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. | -| datafusion.optimizer.physical_uncorrelated_scalar_subquery | true | When set to true, uncorrelated scalar subqueries are left in the logical plan and executed by `ScalarSubqueryExec` during physical execution. When set to false, all scalar subqueries (including uncorrelated ones) are rewritten to left joins by the `ScalarSubqueryToJoin` optimizer rule. | +| datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery | true | When set to true, uncorrelated scalar subqueries are left in the logical plan and executed by `ScalarSubqueryExec` during physical execution. When set to false, all scalar subqueries (including uncorrelated ones) are rewritten to left joins by the `ScalarSubqueryToJoin` optimizer rule. Note disabling this option is not recommended. It restores pre-PR-21240 behavior, which silently produces incorrect results for multi-row subqueries and does not support scalar subqueries in ORDER BY / JOIN ON / aggregate-function arguments. This option is intended as a temporary escape hatch for distributed execution frameworks and is planned to be removed in a future DataFusion release. | | datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | | datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. | | datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. |