diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs index 572ae83540892..b8ebc80348134 100644 --- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs @@ -714,3 +714,123 @@ fn no_limit_preserves_plan_identity() -> Result<()> { Ok(()) } + +#[test] +fn outer_offset_does_not_leak_through_sort_into_inner_limit() -> Result<()> { + // Regression test for https://github.com/apache/datafusion/issues/22489 + // + // When an outer OFFSET is separated from an inner LIMIT by a SortExec + // with different sort keys, the outer skip must not reduce the inner + // fetch. Before the fix, combine_limit merged them, producing + // GlobalLimitExec(skip=1, fetch=7) instead of preserving the inner + // LIMIT 8. + // + // Plan structure: + // GlobalLimitExec: skip=1, fetch=None (outer OFFSET 1) + // SortExec: [c1 DESC] (outer sort — different key) + // GlobalLimitExec: skip=0, fetch=8 (inner LIMIT 8) + // SortExec: [c2 ASC] (inner sort — different key) + // EmptyExec + let schema = create_schema(); + let empty = empty_exec(Arc::clone(&schema)); + + let inner_ordering: LexOrdering = [PhysicalSortExpr { + expr: col("c2", &schema)?, + options: SortOptions::default(), + }] + .into(); + let inner_sort = sort_exec(inner_ordering, empty); + let inner_limit = global_limit_exec(inner_sort, 0, Some(8)); + + let outer_ordering: LexOrdering = [PhysicalSortExpr { + expr: col("c1", &schema)?, + options: SortOptions { + descending: true, + nulls_first: false, + }, + }] + .into(); + let outer_sort = sort_exec(outer_ordering, inner_limit); + let outer_limit = global_limit_exec(outer_sort, 1, None); + + let initial = format_plan(&outer_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=1, fetch=None + SortExec: expr=[c1@0 DESC NULLS LAST], preserve_partitioning=[false] + GlobalLimitExec: skip=0, fetch=8 + SortExec: expr=[c2@1 ASC], preserve_partitioning=[false] + EmptyExec + " + ); + + let after_optimize = + LimitPushdown::new().optimize(outer_limit, &ConfigOptions::new())?; + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + GlobalLimitExec: skip=1, fetch=None + SortExec: expr=[c1@0 DESC NULLS LAST], preserve_partitioning=[false] + SortExec: TopK(fetch=8), expr=[c2@1 ASC], preserve_partitioning=[false] + EmptyExec + " + ); + + Ok(()) +} + +#[test] +fn outer_offset_with_same_sort_key_still_pushes_limit() -> Result<()> { + // Companion to outer_offset_does_not_leak_through_sort_into_inner_limit: + // when both sorts use the *same* key, the inner LIMIT should still be + // pushed into the SortExec as TopK. + // + // Plan structure: + // GlobalLimitExec: skip=1, fetch=None (outer OFFSET 1) + // SortExec: [c1 ASC] (outer sort — same key) + // GlobalLimitExec: skip=0, fetch=8 (inner LIMIT 8) + // SortExec: [c1 ASC] (inner sort — same key) + // EmptyExec + let schema = create_schema(); + let empty = empty_exec(Arc::clone(&schema)); + + let ordering: LexOrdering = [PhysicalSortExpr { + expr: col("c1", &schema)?, + options: SortOptions::default(), + }] + .into(); + + let inner_sort = sort_exec(ordering.clone(), empty); + let inner_limit = global_limit_exec(inner_sort, 0, Some(8)); + let outer_sort = sort_exec(ordering, inner_limit); + let outer_limit = global_limit_exec(outer_sort, 1, None); + + let initial = format_plan(&outer_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=1, fetch=None + SortExec: expr=[c1@0 ASC], preserve_partitioning=[false] + GlobalLimitExec: skip=0, fetch=8 + SortExec: expr=[c1@0 ASC], preserve_partitioning=[false] + EmptyExec + " + ); + + let after_optimize = + LimitPushdown::new().optimize(outer_limit, &ConfigOptions::new())?; + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + GlobalLimitExec: skip=1, fetch=None + SortExec: expr=[c1@0 ASC], preserve_partitioning=[false] + SortExec: TopK(fetch=8), expr=[c1@0 ASC], preserve_partitioning=[false] + EmptyExec + " + ); + + Ok(()) +} diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 6164d86e5342a..63c4f21bd9d6d 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -375,6 +375,14 @@ pub(crate) fn pushdown_limits( (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?; } + // Once a limit has been materialized above the current node, child + // subtrees should not inherit its `skip`. Keep `fetch`, but clear + // `skip` before recursing so child-local limits are not merged with + // an `OFFSET` that has already been applied. + if global_state.satisfied { + global_state.skip = 0; + } + // Apply pushdown limits in children let children = new_node.data.children(); let mut changed = false; diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index fc62584dc3df1..ca2b36727d627 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -989,3 +989,58 @@ c-4 statement ok DROP TABLE t21176; + +# Regression test for https://github.com/apache/datafusion/issues/22489 +# An outer ORDER BY / OFFSET must not reduce an inner LIMIT when the two are +# separated by a sort on a *different* key. + +statement ok +set datafusion.execution.target_partitions = 4; + +statement ok +CREATE TABLE t22489 (g INT, x INT, y INT) AS VALUES (1, 10, 4), (2, 20, 3), (3, 30, 2), (4, 40, 1); + +# Inner ORDER BY sx DESC LIMIT 4 keeps all four groups; the outer ORDER BY +# sy DESC OFFSET 1 then drops only the sy-max group (g=1), so g=2,3,4 remain. +query III +SELECT * FROM ( + SELECT g, SUM(x) AS sx, SUM(y) AS sy FROM t22489 GROUP BY g + ORDER BY sx DESC LIMIT 4 +) q +ORDER BY sy DESC +OFFSET 1; +---- +2 20 3 +3 30 2 +4 40 1 + +query TT +EXPLAIN +SELECT * FROM ( + SELECT g, SUM(x) AS sx, SUM(y) AS sy FROM t22489 GROUP BY g + ORDER BY sx DESC LIMIT 4 +) q +ORDER BY sy DESC +OFFSET 1; +---- +logical_plan +01)Limit: skip=1, fetch=None +02)--Sort: q.sy DESC NULLS FIRST +03)----SubqueryAlias: q +04)------Sort: sx DESC NULLS FIRST, fetch=4 +05)--------Projection: t22489.g, sum(t22489.x) AS sx, sum(t22489.y) AS sy +06)----------Aggregate: groupBy=[[t22489.g]], aggr=[[sum(CAST(t22489.x AS Int64)), sum(CAST(t22489.y AS Int64))]] +07)------------TableScan: t22489 projection=[g, x, y] +physical_plan +01)GlobalLimitExec: skip=1, fetch=None +02)--SortExec: expr=[sy@2 DESC], preserve_partitioning=[false] +03)----SortPreservingMergeExec: [sx@1 DESC], fetch=4 +04)------SortExec: TopK(fetch=4), expr=[sx@1 DESC], preserve_partitioning=[true] +05)--------ProjectionExec: expr=[g@0 as g, sum(t22489.x)@1 as sx, sum(t22489.y)@2 as sy] +06)----------AggregateExec: mode=FinalPartitioned, gby=[g@0 as g], aggr=[sum(t22489.x), sum(t22489.y)] +07)------------RepartitionExec: partitioning=Hash([g@0], 4), input_partitions=1 +08)--------------AggregateExec: mode=Partial, gby=[g@0 as g], aggr=[sum(t22489.x), sum(t22489.y)] +09)----------------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE t22489;