Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,13 @@ config_namespace! {
/// into the file scan phase.
pub enable_topk_dynamic_filter_pushdown: bool, default = true

/// When set to true, uncorrelated scalar subqueries are left in
/// the logical plan and executed by `ScalarSubqueryExec` during physical
/// execution. When set to false, all scalar subqueries (including
/// uncorrelated ones) are rewritten to left joins by the
/// `ScalarSubqueryToJoin` optimizer rule.
pub physical_uncorrelated_scalar_subquery: bool, default = true
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other similar config options use the phrasing enable_...; we should probably adopt that for consistency.

physical_uncorrelated_scalar_subquery is also a mouthful, although I can't immediately think of a more concise name that is also accurate.


/// When set to true, the optimizer will attempt to push down Join dynamic filters
/// into the file scan phase.
pub enable_join_dynamic_filter_pushdown: bool, default = true
Expand Down
15 changes: 14 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,20 @@ impl DefaultPhysicalPlanner {
session_state: &'a SessionState,
) -> futures::future::BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
Box::pin(async move {
let all_subqueries = Self::collect_scalar_subqueries(logical_plan);
// When `physical_uncorrelated_scalar_subquery` is disabled, the
// `ScalarSubqueryToJoin` optimizer rule rewrites all uncorrelated
// scalar subqueries to joins, so none should reach this point.
// Skip collection in that case to avoid creating a no-op
// `ScalarSubqueryExec` wrapper.
let all_subqueries = if session_state
.config_options()
.optimizer
.physical_uncorrelated_scalar_subquery
{
Self::collect_scalar_subqueries(logical_plan)
} else {
Vec::new()
};
let (links, index_map) = self
.plan_scalar_subqueries(all_subqueries, session_state)
.await?;
Expand Down
116 changes: 95 additions & 21 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ impl ScalarSubqueryToJoin {
&self,
predicate: &Expr,
alias_gen: &Arc<AliasGenerator>,
physical_uncorrelated: bool,
) -> Result<(Vec<(Subquery, String)>, Expr)> {
let mut extract = ExtractScalarSubQuery {
sub_query_info: vec![],
alias_gen,
physical_uncorrelated,
};
predicate
.clone()
Expand All @@ -88,15 +90,23 @@ impl OptimizerRule for ScalarSubqueryToJoin {
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let physical_uncorrelated = config
.options()
.optimizer
.physical_uncorrelated_scalar_subquery;
// Optimization: skip the rest of the rule and its copies if
// there are no scalar subqueries
if !contains_correlated_scalar_subquery(&filter.predicate) {
// there are no scalar subqueries this rule should rewrite
if !contains_scalar_subquery_to_rewrite(
&filter.predicate,
physical_uncorrelated,
) {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}

let (subqueries, mut rewrite_expr) = self.extract_subquery_exprs(
&filter.predicate,
config.alias_generator(),
physical_uncorrelated,
)?;

assert_or_internal_err!(
Expand Down Expand Up @@ -141,13 +151,15 @@ impl OptimizerRule for ScalarSubqueryToJoin {
Ok(Transformed::yes(new_plan))
}
LogicalPlan::Projection(projection) => {
let physical_uncorrelated = config
.options()
.optimizer
.physical_uncorrelated_scalar_subquery;
// Optimization: skip the rest of the rule and its copies if there
// are no correlated scalar subqueries
if !projection
.expr
.iter()
.any(contains_correlated_scalar_subquery)
{
// are no scalar subqueries this rule should rewrite
if !projection.expr.iter().any(|expr| {
contains_scalar_subquery_to_rewrite(expr, physical_uncorrelated)
}) {
return Ok(Transformed::no(LogicalPlan::Projection(projection)));
}

Expand All @@ -156,8 +168,11 @@ impl OptimizerRule for ScalarSubqueryToJoin {
let mut rewrite_exprs: Vec<Expr> =
Vec::with_capacity(projection.expr.len());
for (idx, expr) in projection.expr.iter().enumerate() {
let (subqueries, rewrite_expr) =
self.extract_subquery_exprs(expr, config.alias_generator())?;
let (subqueries, rewrite_expr) = self.extract_subquery_exprs(
expr,
config.alias_generator(),
physical_uncorrelated,
)?;
for (_, alias) in &subqueries {
alias_to_index.insert(alias.clone(), idx);
}
Expand Down Expand Up @@ -228,29 +243,42 @@ impl OptimizerRule for ScalarSubqueryToJoin {
}
}

/// Returns true if the expression contains a correlated scalar subquery, false
/// otherwise. Uncorrelated scalar subqueries are handled by the physical
/// planner via `ScalarSubqueryExec` and do not need to be converted to joins.
fn contains_correlated_scalar_subquery(expr: &Expr) -> bool {
/// Returns true if the expression contains a scalar subquery that this rule
/// should rewrite to a join.
///
/// When `physical_uncorrelated_scalar_subquery` is true (the default) only
/// correlated scalar subqueries are rewritten — uncorrelated ones are handled
/// by the physical planner via `ScalarSubqueryExec`. When it is false, all
/// scalar subqueries (correlated and uncorrelated) are rewritten.
fn contains_scalar_subquery_to_rewrite(expr: &Expr, physical_uncorrelated: bool) -> bool {
expr.exists(|expr| {
Ok(matches!(expr, Expr::ScalarSubquery(sq) if !sq.outer_ref_columns.is_empty()))
Ok(matches!(
expr,
Expr::ScalarSubquery(sq)
if !physical_uncorrelated || !sq.outer_ref_columns.is_empty()
))
})
.expect("Inner is always Ok")
}

struct ExtractScalarSubQuery<'a> {
sub_query_info: Vec<(Subquery, String)>,
alias_gen: &'a Arc<AliasGenerator>,
physical_uncorrelated: bool,
}

impl TreeNodeRewriter for ExtractScalarSubQuery<'_> {
type Node = Expr;

fn f_down(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
match expr {
// Skip uncorrelated scalar subqueries
// Match scalar subqueries this rule should rewrite to a join. When
// `physical_uncorrelated` is true, only correlated subqueries are
// rewritten — uncorrelated ones are handled later by the physical
// planner. When false, both are rewritten.
Expr::ScalarSubquery(ref subquery)
if !subquery.outer_ref_columns.is_empty() =>
if !self.physical_uncorrelated
|| !subquery.outer_ref_columns.is_empty() =>
{
let subquery = subquery.clone();
let scalar_expr = subquery
Expand Down Expand Up @@ -308,10 +336,9 @@ fn build_join(
outer_input: &LogicalPlan,
subquery_alias: &str,
) -> Result<Option<(LogicalPlan, HashMap<Column, Expr>)>> {
assert_or_internal_err!(
!subquery.outer_ref_columns.is_empty(),
"build_join should only be called for correlated subqueries"
);
// `build_join` also handles uncorrelated scalar subqueries (as a left
// join with `Boolean(true)`) when the
// `physical_uncorrelated_scalar_subquery` option is disabled.
let subquery_plan = subquery.subquery.as_ref();
let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true);
let decorrelated_subquery = subquery_plan.clone().rewrite(&mut pull_up).data()?;
Expand Down Expand Up @@ -1159,4 +1186,51 @@ mod tests {
"
)
}

#[test]
fn uncorrelated_scalar_subquery_rewritten_when_flag_off() -> Result<()> {
use datafusion_common::config::ConfigOptions;

let sq = Arc::new(
LogicalPlanBuilder::from(scan_tpch_table("orders"))
.aggregate(Vec::<Expr>::new(), vec![max(col("orders.o_custkey"))])?
.project(vec![max(col("orders.o_custkey"))])?
.build()?,
);

let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
.filter(col("customer.c_custkey").eq(scalar_subquery(sq)))?
.project(vec![col("customer.c_custkey")])?
.build()?;

let mut options = ConfigOptions::default();
options.optimizer.filter_null_join_keys = true;
options.optimizer.physical_uncorrelated_scalar_subquery = false;
let context = crate::OptimizerContext::new_with_config_options(Arc::new(options));

let rule: Arc<dyn OptimizerRule + Send + Sync> =
Arc::new(ScalarSubqueryToJoin::new());
let optimizer = crate::Optimizer::with_rules(vec![rule]);
let optimized_plan = optimizer
.optimize(plan, &context, |_, _| {})
.expect("failed to optimize plan");
let formatted_plan = optimized_plan.display_indent_schema();

insta::assert_snapshot!(
formatted_plan,
@r"
Projection: customer.c_custkey [c_custkey:Int64]
Projection: customer.c_custkey, customer.c_name [c_custkey:Int64, c_name:Utf8]
Filter: customer.c_custkey = __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]
Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]
Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]
Aggregate: groupBy=[[]], aggr=[[max(orders.o_custkey)]] [max(orders.o_custkey):Int64;N]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
"
);

Ok(())
}
}
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
datafusion.optimizer.join_reordering true
datafusion.optimizer.max_passes 3
datafusion.optimizer.physical_uncorrelated_scalar_subquery true
datafusion.optimizer.prefer_existing_sort false
datafusion.optimizer.prefer_existing_union false
datafusion.optimizer.prefer_hash_join true
Expand Down Expand Up @@ -470,6 +471,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum es
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.join_reordering true When set to true, the physical plan optimizer may swap join inputs based on statistics. When set to false, statistics-driven join input reordering is disabled and the original join order in the query is used.
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
datafusion.optimizer.physical_uncorrelated_scalar_subquery true When set to true, uncorrelated scalar subqueries are left in the logical plan and executed by `ScalarSubqueryExec` during physical execution. When set to false, all scalar subqueries (including uncorrelated ones) are rewritten to left joins by the `ScalarSubqueryToJoin` optimizer rule.
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave
datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
Expand Down
89 changes: 89 additions & 0 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,95 @@ SELECT (SELECT v FROM (SELECT 1 AS v UNION ALL SELECT 2) AS t ORDER BY v LIMIT 1
----
1

#############
## End-to-end correctness coverage for the flag-off path.
## When `datafusion.optimizer.physical_uncorrelated_scalar_subquery` is false,
## uncorrelated scalar subqueries are rewritten to left joins by
## `ScalarSubqueryToJoin` instead of executed by `ScalarSubqueryExec`. This
## restores pre-PR-21240 behavior, which has three known shortcomings the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you only list two known shortcomings?

## physical-execution path was built to fix: multi-row subqueries silently
## return wrong results, and uncorrelated scalar subqueries do not work in
## ORDER BY / JOIN ON / aggregate-function arguments. Those cases are
## intentionally not covered here; the queries below are the ones where both
## paths agree.
Comment on lines +2098 to +2104
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were known limitations/bugs #21240 fixed

#############

statement ok
set datafusion.optimizer.physical_uncorrelated_scalar_subquery = false;

# Scalar subquery returning exactly one row → success
query I
SELECT (SELECT v FROM sq_values LIMIT 1);
----
1

# Scalar subquery returning exactly one row in WHERE → success
query I rowsort
SELECT x FROM sq_main WHERE x > (SELECT v FROM sq_values LIMIT 1);
----
10
20

# Scalar subquery returning zero rows → NULL
query I
SELECT (SELECT v FROM sq_empty);
----
NULL

# Scalar subquery returning zero rows in arithmetic → NULL propagation
query I
SELECT x + (SELECT v FROM sq_empty) FROM sq_main;
----
NULL
NULL

# Scalar subquery returning zero rows in WHERE comparison → no matching rows
query I
SELECT x FROM sq_main WHERE x > (SELECT v FROM sq_empty);
----

# Aggregated subquery always returns one row, even on empty input → success
query I
SELECT (SELECT count(*) FROM sq_empty);
----
0

# Aggregated subquery on multi-row table → success
query I
SELECT (SELECT max(v) FROM sq_values);
----
3

# HAVING clause with uncorrelated scalar subquery
query II rowsort
SELECT x, count(*) AS cnt FROM sq_main GROUP BY x
HAVING count(*) > (SELECT min(v) FROM sq_values);
----

# CASE WHEN with uncorrelated scalar subquery as condition
query T rowsort
SELECT CASE WHEN x > (SELECT min(v) FROM sq_values)
THEN 'big' ELSE 'small' END AS label
FROM sq_main;
----
big
big

# Doubly-nested constant subquery
query I
SELECT (SELECT (SELECT 42));
----
42

# NULL comparison semantics through subquery boundary
query B
SELECT 1 = (SELECT CAST(NULL AS INT));
----
NULL

statement ok
RESET datafusion.optimizer.physical_uncorrelated_scalar_subquery;

statement count 0
DROP TABLE sq_values;

Expand Down
9 changes: 9 additions & 0 deletions datafusion/sqllogictest/test_files/tpch/tpch.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ include ./create_tables.slt.part
include ./plans/q*.slt.part
include ./answers/q*.slt.part

# test answers with uncorrelated scalar subqueries rewritten to joins
statement ok
set datafusion.optimizer.physical_uncorrelated_scalar_subquery = false;

include ./answers/q*.slt.part

statement ok
reset datafusion.optimizer.physical_uncorrelated_scalar_subquery;

# test answers with sort merge join
statement ok
set datafusion.optimizer.prefer_hash_join = false;
Expand Down
Loading
Loading