From e69fe3ea8de686116f3b3ebf28665d747fccee49 Mon Sep 17 00:00:00 2001 From: lichi Date: Wed, 27 May 2026 21:14:25 +0800 Subject: [PATCH 1/2] fix(join): avoid join amplification in scalar subquery window rewrite --- .../AggScalarSubQueryToWindowFunction.java | 30 ++++++-- ...AggScalarSubQueryToWindowFunctionTest.java | 64 ++++++++++++++++ ...ted_scalar_subquery_to_window_function.out | 7 ++ ..._scalar_subquery_to_window_function.groovy | 73 +++++++++++++++++++ 4 files changed, 168 insertions(+), 6 deletions(-) create mode 100644 regression-test/data/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.out create mode 100644 regression-test/suites/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java index dd9fa09843693f..3054f5bc7e18d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java @@ -95,6 +95,7 @@ public class AggScalarSubQueryToWindowFunction extends DefaultPlanRewriter innerPlans = Lists.newArrayList(); private final List functions = Lists.newArrayList(); private final Map innerOuterSlotMap = Maps.newHashMap(); + private final List partitionBySlots = Lists.newArrayList(); /** * the entrance of this rule. we only override one visitor: visitLogicalFilter @@ -130,6 +131,7 @@ private Optional> findApply(LogicalFilter outerFilter, LogicalApply apply) { + resetState(); outerPlans.addAll(apply.child(0).collect(LogicalPlan.class::isInstance)); innerPlans.addAll(apply.child(1).collect(LogicalPlan.class::isInstance)); @@ -138,10 +140,18 @@ && checkApply(apply) && checkAggregate() && checkJoin() && checkProject() - && checkRelation(apply.getCorrelationSlot()) + && checkRelation(apply) && checkFilter(outerFilter); } + private void resetState() { + outerPlans.clear(); + innerPlans.clear(); + functions.clear(); + innerOuterSlotMap.clear(); + partitionBySlots.clear(); + } + // check children's nodes because query process will be changed private boolean checkPlanType() { return outerPlans.stream().allMatch(p -> OUTER_SUPPORTED_PLAN.stream().anyMatch(c -> c.isInstance(p))) @@ -250,7 +260,8 @@ private boolean checkProject() { * 2. outer table list - inner table list should only remain 1 table * 3. the remaining table in step 2 should be correlated table for inner plan */ - private boolean checkRelation(List correlatedSlots) { + private boolean checkRelation(LogicalApply apply) { + List correlatedSlots = apply.getCorrelationSlot(); List outerTables = outerPlans.stream().filter(CatalogRelation.class::isInstance) .map(CatalogRelation.class::cast) .collect(Collectors.toList()); @@ -278,6 +289,12 @@ private boolean checkRelation(List correlatedSlots) { .filter(node -> outerIds.contains(node.getTable().getId())) .map(LogicalRelation.class::cast) .map(LogicalRelation::getOutputExprIdSet).flatMap(Collection::stream).collect(Collectors.toSet()); + partitionBySlots.addAll(apply.left().getOutput().stream() + .filter(slot -> correlatedRelationOutput.contains(slot.getExprId())) + .collect(Collectors.toList())); + if (partitionBySlots.isEmpty()) { + return false; + } return correlatedSlots.stream().allMatch(e -> correlatedRelationOutput.contains(e.getExprId())); } @@ -346,7 +363,7 @@ private Plan rewrite(LogicalFilter filter, LogicalApply filter, LogicalApply correlatedSlots, AggregateFunction function) { - // partition by clause is set by all the correlated slots. - return new WindowExpression(function, ImmutableList.copyOf(correlatedSlots), Collections.emptyList()); + private WindowExpression createWindowFunction(AggregateFunction function) { + // Partition by all visible slots from the outer-only relation so join-expanded rows from + // that relation do not change the scalar subquery result. + return new WindowExpression(function, ImmutableList.copyOf(partitionBySlots), Collections.emptyList()); } private static class ExpressionIdenticalChecker extends DefaultExpressionVisitor { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java index 443dbaebd8f84b..9ecc77a873232a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java @@ -19,15 +19,24 @@ import org.apache.doris.nereids.datasets.tpch.TPCHTestBase; import org.apache.doris.nereids.datasets.tpch.TPCHUtils; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.WindowExpression; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; import org.apache.doris.nereids.util.MemoPatternMatchSupported; import org.apache.doris.nereids.util.PlanChecker; +import com.google.common.collect.ImmutableSet; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + public class AggScalarSubQueryToWindowFunctionTest extends TPCHTestBase implements MemoPatternMatchSupported { private static final String SQL_TEMPLATE = " select\n" + " sum(l_extendedprice) / 7.0 as avg_yearly\n" @@ -338,6 +347,61 @@ public void testNotMatchTheRule() { } } + @Test + public void testWindowPartitionsByOuterOnlyRelationSlots() throws Exception { + createTable("CREATE TABLE tpch.fact (\n" + + " id INT,\n" + + " k INT,\n" + + " v INT\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(id)\n" + + "DISTRIBUTED BY HASH(id) BUCKETS 1\n" + + "PROPERTIES ('replication_num' = '1')"); + createTable("CREATE TABLE tpch.dim (\n" + + " did INT,\n" + + " k INT,\n" + + " tag INT\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(did)\n" + + "DISTRIBUTED BY HASH(did) BUCKETS 1\n" + + "PROPERTIES ('replication_num' = '1')"); + + String sql = "SELECT d.did, f.id, f.k, f.v " + + "FROM fact f, dim d " + + "WHERE f.k = d.k " + + " AND f.v * 2 > (" + + " SELECT SUM(f2.v) " + + " FROM fact f2 " + + " WHERE f2.k = d.k" + + " )"; + + Plan plan = PlanChecker.from(createCascadesContext(sql)) + .analyze(sql) + .applyBottomUp(new PullUpProjectUnderApply()) + .applyTopDown(new PushDownFilterThroughProject()) + .customRewrite(new EliminateUnnecessaryProject()) + .customRewrite(new AggScalarSubQueryToWindowFunction()) + .rewrite() + .getPlan(); + + List> windows = plan.collectToList(LogicalWindow.class::isInstance); + Assertions.assertEquals(1, windows.size()); + + LogicalWindow window = windows.get(0); + List windowExpressions = window.getWindowExpressions(); + Assertions.assertEquals(1, windowExpressions.size()); + + WindowExpression windowExpression = (WindowExpression) windowExpressions.get(0).child(0); + Set partitionKeys = windowExpression.getPartitionKeys().stream() + .map(Expression.class::cast) + .filter(Slot.class::isInstance) + .map(Slot.class::cast) + .map(Slot::getName) + .collect(Collectors.toSet()); + Assertions.assertTrue(partitionKeys.containsAll(ImmutableSet.of("did", "k")), + "partition keys must include the outer-only relation slots that distinguish duplicated rows"); + } + private void check(String sql) { System.out.printf("Test:\n%s\n\n", sql); Plan plan = PlanChecker.from(createCascadesContext(sql)) diff --git a/regression-test/data/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.out b/regression-test/data/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.out new file mode 100644 index 00000000000000..1db657f572a0ef --- /dev/null +++ b/regression-test/data/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !d26072 -- +10 2 1 7 +11 2 1 7 +20 4 2 10 +30 5 3 8 + diff --git a/regression-test/suites/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.groovy b/regression-test/suites/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.groovy new file mode 100644 index 00000000000000..2f879efa4af6ce --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.groovy @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("correlated_scalar_subquery_to_window_function") { + sql "DROP TABLE IF EXISTS fact FORCE" + sql "DROP TABLE IF EXISTS dim FORCE" + + sql """ + CREATE TABLE fact ( + id INT, + k INT, + v INT + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ('replication_num' = '1') + """ + + sql """ + CREATE TABLE dim ( + did INT, + k INT, + tag INT + ) ENGINE=OLAP + DUPLICATE KEY(did) + DISTRIBUTED BY HASH(did) BUCKETS 1 + PROPERTIES ('replication_num' = '1') + """ + + sql """ + insert into fact values + (1, 1, 5), + (2, 1, 7), + (3, 2, 4), + (4, 2, 10), + (5, 3, 8) + """ + + sql """ + insert into dim values + (10, 1, 1), + (11, 1, 1), + (20, 2, 1), + (30, 3, 0), + (31, null, 1) + """ + + order_qt_d26072 """ + SELECT d.did, f.id, f.k, f.v + FROM fact f, dim d + WHERE f.k = d.k + AND f.v * 2 > ( + SELECT SUM(f2.v) + FROM fact f2 + WHERE f2.k = d.k + ) + ORDER BY d.did, f.id + """ +} \ No newline at end of file From 98db5635b25115bdb94a35e899516abf1b4718b4 Mon Sep 17 00:00:00 2001 From: lichi Date: Thu, 28 May 2026 10:49:40 +0800 Subject: [PATCH 2/2] fix comments --- .../AggScalarSubQueryToWindowFunction.java | 7 +++ ...AggScalarSubQueryToWindowFunctionTest.java | 56 ++++++++++++++++--- ...ted_scalar_subquery_to_window_function.out | 6 ++ ..._scalar_subquery_to_window_function.groovy | 15 +++++ 4 files changed, 77 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java index 3054f5bc7e18d1..74ce48b60e0b91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java @@ -295,6 +295,13 @@ private boolean checkRelation(LogicalApply apply) { if (partitionBySlots.isEmpty()) { return false; } + if (!partitionBySlots.stream().map(Slot::getExprId).collect(Collectors.toSet()) + .equals(correlatedRelationOutput)) { + // The rewrite is only safe when the window can partition by the full visible row image of the + // outer-only relation. If some of those slots have been pruned from apply.left(), duplicate + // outer rows can collapse into one partition and multiply the aggregate result. + return false; + } return correlatedSlots.stream().allMatch(e -> correlatedRelationOutput.contains(e.getExprId())); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java index 9ecc77a873232a..f8f70b9bc431d4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java @@ -349,7 +349,7 @@ public void testNotMatchTheRule() { @Test public void testWindowPartitionsByOuterOnlyRelationSlots() throws Exception { - createTable("CREATE TABLE tpch.fact (\n" + createTable("CREATE TABLE tpch.fact_window_safe (\n" + " id INT,\n" + " k INT,\n" + " v INT\n" @@ -357,7 +357,7 @@ public void testWindowPartitionsByOuterOnlyRelationSlots() throws Exception { + "DUPLICATE KEY(id)\n" + "DISTRIBUTED BY HASH(id) BUCKETS 1\n" + "PROPERTIES ('replication_num' = '1')"); - createTable("CREATE TABLE tpch.dim (\n" + createTable("CREATE TABLE tpch.dim_window_safe (\n" + " did INT,\n" + " k INT,\n" + " tag INT\n" @@ -366,12 +366,12 @@ public void testWindowPartitionsByOuterOnlyRelationSlots() throws Exception { + "DISTRIBUTED BY HASH(did) BUCKETS 1\n" + "PROPERTIES ('replication_num' = '1')"); - String sql = "SELECT d.did, f.id, f.k, f.v " - + "FROM fact f, dim d " + String sql = "SELECT d.did, d.k, d.tag, f.id, f.v " + + "FROM fact_window_safe f, dim_window_safe d " + "WHERE f.k = d.k " + " AND f.v * 2 > (" + " SELECT SUM(f2.v) " - + " FROM fact f2 " + + " FROM fact_window_safe f2 " + " WHERE f2.k = d.k" + " )"; @@ -398,8 +398,50 @@ public void testWindowPartitionsByOuterOnlyRelationSlots() throws Exception { .map(Slot.class::cast) .map(Slot::getName) .collect(Collectors.toSet()); - Assertions.assertTrue(partitionKeys.containsAll(ImmutableSet.of("did", "k")), - "partition keys must include the outer-only relation slots that distinguish duplicated rows"); + Assertions.assertEquals(ImmutableSet.of("did", "k", "tag"), partitionKeys); + } + + @Test + public void testNotMatchWhenOuterOnlyRelationOutputIsPruned() throws Exception { + createTable("CREATE TABLE tpch.fact_window_pruned (\n" + + " id INT,\n" + + " k INT,\n" + + " v INT\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(id)\n" + + "DISTRIBUTED BY HASH(id) BUCKETS 1\n" + + "PROPERTIES ('replication_num' = '1')"); + createTable("CREATE TABLE tpch.dim_window_pruned (\n" + + " did INT,\n" + + " k INT,\n" + + " tag INT\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(did)\n" + + "DISTRIBUTED BY HASH(did) BUCKETS 1\n" + + "PROPERTIES ('replication_num' = '1')"); + + String sql = "SELECT t.id, t.k, t.v " + + "FROM (" + + " SELECT f.id, d.k, f.v " + + " FROM fact_window_pruned f, dim_window_pruned d " + + " WHERE f.k = d.k" + + ") t " + + "WHERE t.v * 2 > (" + + " SELECT SUM(f2.v) " + + " FROM fact_window_pruned f2 " + + " WHERE f2.k = t.k" + + " )"; + + Plan plan = PlanChecker.from(createCascadesContext(sql)) + .analyze(sql) + .applyBottomUp(new PullUpProjectUnderApply()) + .applyTopDown(new PushDownFilterThroughProject()) + .customRewrite(new EliminateUnnecessaryProject()) + .customRewrite(new AggScalarSubQueryToWindowFunction()) + .rewrite() + .getPlan(); + + Assertions.assertFalse(plan.anyMatch(LogicalWindow.class::isInstance)); } private void check(String sql) { diff --git a/regression-test/data/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.out b/regression-test/data/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.out index 1db657f572a0ef..5c34f7a5583243 100644 --- a/regression-test/data/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.out +++ b/regression-test/data/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.out @@ -5,3 +5,9 @@ 20 4 2 10 30 5 3 8 +-- !d26072_no_change -- +2 1 7 +2 1 7 +4 2 10 +5 3 8 + diff --git a/regression-test/suites/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.groovy b/regression-test/suites/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.groovy index 2f879efa4af6ce..5fd7bfe2f928b3 100644 --- a/regression-test/suites/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.groovy +++ b/regression-test/suites/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.groovy @@ -70,4 +70,19 @@ suite("correlated_scalar_subquery_to_window_function") { ) ORDER BY d.did, f.id """ + + order_qt_d26072_no_change """ + SELECT t.id, t.k, t.v + FROM ( + SELECT f.id, d.k, f.v + FROM fact f, dim d + WHERE f.k = d.k + ) t + WHERE t.v * 2 > ( + SELECT SUM(f2.v) + FROM fact f2 + WHERE f2.k = t.k + ) + ORDER BY t.id, t.k, t.v + """ } \ No newline at end of file