Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,3 +714,69 @@ fn no_limit_preserves_plan_identity() -> Result<()> {

Ok(())
}

#[test]
fn outer_offset_does_not_leak_through_sort_into_inner_limit() -> Result<()> {
// Regression test for https://github.com/apache/datafusion/issues/22489
//
// When an outer OFFSET is separated from an inner LIMIT by a SortExec
// with different sort keys, the outer skip must not reduce the inner
// fetch. Before the fix, combine_limit merged them, producing
// GlobalLimitExec(skip=1, fetch=7) instead of preserving the inner
// LIMIT 8.
//
// Plan structure:
// GlobalLimitExec: skip=1, fetch=None (outer OFFSET 1)
// SortExec: [c1 DESC] (outer sort — different key)
// GlobalLimitExec: skip=0, fetch=8 (inner LIMIT 8)
// SortExec: [c2 ASC] (inner sort — different key)
// EmptyExec
let schema = create_schema();
let empty = empty_exec(Arc::clone(&schema));

let inner_ordering: LexOrdering = [PhysicalSortExpr {
expr: col("c2", &schema)?,
options: SortOptions::default(),
}]
.into();
let inner_sort = sort_exec(inner_ordering, empty);
let inner_limit = global_limit_exec(inner_sort, 0, Some(8));

let outer_ordering: LexOrdering = [PhysicalSortExpr {
expr: col("c1", &schema)?,
options: SortOptions {
descending: true,
nulls_first: false,
},
}]
.into();
let outer_sort = sort_exec(outer_ordering, inner_limit);
let outer_limit = global_limit_exec(outer_sort, 1, None);

let initial = format_plan(&outer_limit);
insta::assert_snapshot!(
initial,
@r"
GlobalLimitExec: skip=1, fetch=None
SortExec: expr=[c1@0 DESC NULLS LAST], preserve_partitioning=[false]
GlobalLimitExec: skip=0, fetch=8
SortExec: expr=[c2@1 ASC], preserve_partitioning=[false]
EmptyExec
"
);

let after_optimize =
LimitPushdown::new().optimize(outer_limit, &ConfigOptions::new())?;
let optimized = format_plan(&after_optimize);
insta::assert_snapshot!(
optimized,
@r"
GlobalLimitExec: skip=1, fetch=None
SortExec: expr=[c1@0 DESC NULLS LAST], preserve_partitioning=[false]
SortExec: TopK(fetch=8), expr=[c2@1 ASC], preserve_partitioning=[false]
EmptyExec
"
);

Ok(())
}
8 changes: 8 additions & 0 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,14 @@ pub(crate) fn pushdown_limits(
(new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
}

// Once a limit has been materialized above the current node, child
// subtrees should not inherit its `skip`. Keep `fetch`, but clear
// `skip` before recursing so child-local limits are not merged with
// an `OFFSET` that has already been applied.
if global_state.satisfied {
global_state.skip = 0;
}

// Apply pushdown limits in children
let children = new_node.data.children();
let mut changed = false;
Expand Down
31 changes: 31 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -989,3 +989,34 @@ c-4

statement ok
DROP TABLE t21176;

# Regression test for https://github.com/apache/datafusion/issues/22489
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this guards against the issue:

I reverted the code change locally

diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs
index 63c4f21bd9..6164d86e53 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown.rs
@@ -375,14 +375,6 @@ pub(crate) fn pushdown_limits(
         (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
     }

-    // Once a limit has been materialized above the current node, child
-    // subtrees should not inherit its `skip`. Keep `fetch`, but clear
-    // `skip` before recursing so child-local limits are not merged with
-    // an `OFFSET` that has already been applied.
-    if global_state.satisfied {
-        global_state.skip = 0;
-    }
-
     // Apply pushdown limits in children
     let children = new_node.data.children();
     let mut changed = false;

And then I ran the tests:

cargo test --profile=ci --test sqllogictests
...
Running with 16 test threads (available parallelism: 16)
Completed 472 test files in 9 seconds

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# An outer ORDER BY / OFFSET must not reduce an inner LIMIT when separated
# by a sort with different keys.

statement ok
CREATE TABLE t22489_t0 (id INT) AS VALUES (0), (1);

statement ok
CREATE TABLE t22489_t1 (id INT, j INT) AS VALUES (1, 1);

query III
SELECT * FROM (
SELECT * FROM (
SELECT t0.id, COUNT(t0.id) AS count_id, COUNT(DISTINCT j) AS nunique_j
FROM t22489_t0 t0 LEFT JOIN t22489_t1 t1 ON t0.id = t1.id
GROUP BY t0.id
) q
ORDER BY id DESC NULLS LAST, count_id DESC NULLS LAST, nunique_j DESC NULLS LAST
LIMIT 8
) q2
ORDER BY id DESC NULLS LAST, count_id DESC NULLS LAST, nunique_j ASC NULLS LAST
OFFSET 1;
----
0 1 0

statement ok
DROP TABLE t22489_t0;

statement ok
DROP TABLE t22489_t1;
Loading