Skip to content

feat(aqe): delay join decision & introduce broadcast join in AQE#1752

Open
milenkovicm wants to merge 10 commits into
apache:mainfrom
milenkovicm:feat_dynamic_join_selection
Open

feat(aqe): delay join decision & introduce broadcast join in AQE#1752
milenkovicm wants to merge 10 commits into
apache:mainfrom
milenkovicm:feat_dynamic_join_selection

Conversation

@milenkovicm
Copy link
Copy Markdown
Contributor

@milenkovicm milenkovicm commented May 23, 2026

Overview

This commit implements dynamic join strategy selection in Ballista's Adaptive Query Execution (AQE) pipeline. Instead of choosing a join strategy at planning time (when data sizes are unknown), the plan now inserts a placeholder DynamicJoinSelectionExec node that defers the decision until runtime statistics are available.

Before (planning time)
  HashJoinExec / SortMergeJoinExec
      ↑ chosen by DataFusion optimizer (no runtime stats)

After (AQE flow)
  Step 1 – plan creation:
    DelayJoinSelectionRule → DynamicJoinSelectionExec  (placeholder)

  Step 2 – stage completes, stats available:
    SelectJoinRule → CollectLeft | LateCollectLeft | Hash | Sort | Repartition
                          ↑ decided using actual byte / row counts

The change means Ballista can now broadcast small tables or swap join sides based on real data sizes observed during execution, not just estimates from the logical plan.

Introduced planner rules consider few configuration values to decide join type:

  • datafusion.optimizer.hash_join_single_partition_threshold_rows, to select broadcast join or not
  • datafusion.optimizer.hash_join_single_partition_threshold, to select broadcast join or not
  • datafusion.optimizer.prefer_hash_join to select actual preferred join implementation

with dynamic join selection some TPCH jobs have less stages created

Screenshot 2026-05-23 at 20 51 03

compared to static scheduling

Screenshot 2026-05-23 at 20 55 20

TODO

  • broadcast join gives bad results for TPCH 2, 13, 16
  • configuration to disable rule ballista.planner.adaptive_join.enabled
  • test with sort join
  • decision should we stick with ballista.optimizer.broadcast_join_threshold_bytes or go with datafusion configuration values
  • more testing
  • TPCDS Q72 test
  • code clean up

Context

this PR is part of #1359

Testing

currently relevant configuration values are not set, to this feature for testing configure :

ballista.planner.adaptive.enabled=true
datafusion.optimizer.hash_join_single_partition_threshold=10485760
datafusion.optimizer.hash_join_single_partition_threshold_rows=100_00

@milenkovicm milenkovicm force-pushed the feat_dynamic_join_selection branch from 2f4e895 to 4a69e05 Compare May 23, 2026 20:27
}))
}

pub(crate) fn children_are_ready(&self) -> bool {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Horrible method name

Comment thread ballista/scheduler/src/state/aqe/mod.rs Outdated
Comment thread ballista/scheduler/src/state/aqe/mod.rs Outdated
@milenkovicm
Copy link
Copy Markdown
Contributor Author

Q2 (Static Plan)

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Job 0buY2Mw physical plan:
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SortPreservingMergeExec: [s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], fetch=100
  SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], preserve_partitioning=[true]
    ProjectionExec: expr=[s_acctbal@5 as s_acctbal, s_name@2 as s_name, n_name@7 as n_name, p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_address@3 as s_address, s_phone@4 as s_phone, s_comment@6 as s_comment]
      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p_partkey@0, ps_partkey@1), (ps_supplycost@7, min(partsupp.ps_supplycost)@0)], projection=[p_partkey@0, p_mfgr@1, s_name@2, s_address@3, s_phone@4, s_acctbal@5, s_comment@6, n_name@8]
        CoalescePartitionsExec
          HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(r_regionkey@0, n_regionkey@9)], projection=[p_partkey@1, p_mfgr@2, s_name@3, s_address@4, s_phone@5, s_acctbal@6, s_comment@7, ps_supplycost@8, n_name@9]
            FilterExec: r_name@1 = EUROPE, projection=[r_regionkey@0]
              DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/region/part-0.parquet]]}, projection=[r_regionkey, r_name], file_type=parquet, predicate=r_name@1 = EUROPE, pruning_predicate=r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1, required_guarantees=[r_name in (EUROPE)]
            ProjectionExec: expr=[p_partkey@2 as p_partkey, p_mfgr@3 as p_mfgr, s_name@4 as s_name, s_address@5 as s_address, s_phone@6 as s_phone, s_acctbal@7 as s_acctbal, s_comment@8 as s_comment, ps_supplycost@9 as ps_supplycost, n_name@0 as n_name, n_regionkey@1 as n_regionkey]
              HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(n_nationkey@0, s_nationkey@4)], projection=[n_name@1, n_regionkey@2, p_partkey@3, p_mfgr@4, s_name@5, s_address@6, s_phone@8, s_acctbal@9, s_comment@10, ps_supplycost@11]
                DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/nation/part-0.parquet]]}, projection=[n_nationkey, n_name, n_regionkey], file_type=parquet, predicate=DynamicFilter [ empty ]
                ProjectionExec: expr=[p_partkey@6 as p_partkey, p_mfgr@7 as p_mfgr, s_name@0 as s_name, s_address@1 as s_address, s_nationkey@2 as s_nationkey, s_phone@3 as s_phone, s_acctbal@4 as s_acctbal, s_comment@5 as s_comment, ps_supplycost@8 as ps_supplycost]
                  HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(s_suppkey@0, ps_suppkey@2)], projection=[s_name@1, s_address@2, s_nationkey@3, s_phone@4, s_acctbal@5, s_comment@6, p_partkey@7, p_mfgr@8, ps_supplycost@10]
                    DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/supplier/part-0.parquet]]}, projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment], file_type=parquet, predicate=DynamicFilter [ empty ]
                    HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p_partkey@0, ps_partkey@0)], projection=[p_partkey@0, p_mfgr@1, ps_suppkey@3, ps_supplycost@4]
                      CoalescePartitionsExec
                        FilterExec: p_size@3 = 15 AND p_type@2 LIKE %BRASS, projection=[p_partkey@0, p_mfgr@1]
                          DataSourceExec: file_groups={7 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-0.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-1.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-10.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-11.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-12.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-13.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-2.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-3.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-4.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-5.parquet], ...]}, projection=[p_partkey, p_mfgr, p_type, p_size], file_type=parquet, predicate=p_size@5 = 15 AND p_type@4 LIKE %BRASS, pruning_predicate=p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1, required_guarantees=[p_size in (15)]
                      DataSourceExec: file_groups={8 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-0.parquet:0..2385925, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:0..1749648], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:1749648..2367980, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-10.parquet:0..2354492, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:0..1162749], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:1162749..2356504, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-12.parquet:0..2355805, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:0..586013], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:586013..2352367, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:0..2369219], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:2369219..2369944, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-3.parquet:0..2369786, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-4.parquet:0..1765062], ...]}, projection=[ps_partkey, ps_suppkey, ps_supplycost], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        ProjectionExec: expr=[min(partsupp.ps_supplycost)@1 as min(partsupp.ps_supplycost), ps_partkey@0 as ps_partkey]
          AggregateExec: mode=FinalPartitioned, gby=[ps_partkey@0 as ps_partkey], aggr=[min(partsupp.ps_supplycost)]
            RepartitionExec: partitioning=Hash([ps_partkey@0], 8), input_partitions=8
              AggregateExec: mode=Partial, gby=[ps_partkey@0 as ps_partkey], aggr=[min(partsupp.ps_supplycost)]
                HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(r_regionkey@0, n_regionkey@2)], projection=[ps_partkey@1, ps_supplycost@2]
                  FilterExec: r_name@1 = EUROPE, projection=[r_regionkey@0]
                    DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/region/part-0.parquet]]}, projection=[r_regionkey, r_name], file_type=parquet, predicate=r_name@1 = EUROPE, pruning_predicate=r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1, required_guarantees=[r_name in (EUROPE)]
                  ProjectionExec: expr=[ps_partkey@1 as ps_partkey, ps_supplycost@2 as ps_supplycost, n_regionkey@0 as n_regionkey]
                    HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(n_nationkey@0, s_nationkey@2)], projection=[n_regionkey@1, ps_partkey@2, ps_supplycost@3]
                      DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/nation/part-0.parquet]]}, projection=[n_nationkey, n_regionkey], file_type=parquet, predicate=DynamicFilter [ empty ]
                      ProjectionExec: expr=[ps_partkey@1 as ps_partkey, ps_supplycost@2 as ps_supplycost, s_nationkey@0 as s_nationkey]
                        HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(s_suppkey@0, ps_suppkey@1)], projection=[s_nationkey@1, ps_partkey@2, ps_supplycost@4]
                          DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/supplier/part-0.parquet]]}, projection=[s_suppkey, s_nationkey], file_type=parquet, predicate=DynamicFilter [ empty ]
                          DataSourceExec: file_groups={8 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-0.parquet:0..2385925, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:0..1749648], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:1749648..2367980, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-10.parquet:0..2354492, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:0..1162749], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:1162749..2356504, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-12.parquet:0..2355805, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:0..586013], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:586013..2352367, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:0..2369219], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:2369219..2369944, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-3.parquet:0..2369786, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-4.parquet:0..1765062], ...]}, projection=[ps_partkey, ps_suppkey, ps_supplycost], file_type=parquet, predicate=DynamicFilter [ empty ]

Q2 (AQE)

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Job V3vvoKt physical plan:
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
AdaptiveDatafusionExec: is_final=true, plan_id=3, stage_id=6, stage_resolved=true
  SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], preserve_partitioning=[false]
    ProjectionExec: expr=[s_acctbal@5 as s_acctbal, s_name@2 as s_name, n_name@7 as n_name, p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_address@3 as s_address, s_phone@4 as s_phone, s_comment@6 as s_comment]
      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p_partkey@0, ps_partkey@1), (ps_supplycost@7, min(partsupp.ps_supplycost)@0)], projection=[p_partkey@0, p_mfgr@1, s_name@2, s_address@3, s_phone@4, s_acctbal@5, s_comment@6, n_name@8]
        HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(r_regionkey@0, n_regionkey@9)], projection=[p_partkey@1, p_mfgr@2, s_name@3, s_address@4, s_phone@5, s_acctbal@6, s_comment@7, ps_supplycost@8, n_name@9]
          FilterExec: r_name@1 = EUROPE, projection=[r_regionkey@0]
            DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/region/part-0.parquet]]}, projection=[r_regionkey, r_name], file_type=parquet, predicate=r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE, pruning_predicate=r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1, required_guarantees=[r_name in (EUROPE)]
          ProjectionExec: expr=[p_partkey@2 as p_partkey, p_mfgr@3 as p_mfgr, s_name@4 as s_name, s_address@5 as s_address, s_phone@6 as s_phone, s_acctbal@7 as s_acctbal, s_comment@8 as s_comment, ps_supplycost@9 as ps_supplycost, n_name@0 as n_name, n_regionkey@1 as n_regionkey]
            HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(n_nationkey@0, s_nationkey@4)], projection=[n_name@1, n_regionkey@2, p_partkey@3, p_mfgr@4, s_name@5, s_address@6, s_phone@8, s_acctbal@9, s_comment@10, ps_supplycost@11]
              DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/nation/part-0.parquet]]}, projection=[n_nationkey, n_name, n_regionkey], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ]
              ProjectionExec: expr=[p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_name@3 as s_name, s_address@4 as s_address, s_nationkey@5 as s_nationkey, s_phone@6 as s_phone, s_acctbal@7 as s_acctbal, s_comment@8 as s_comment, ps_supplycost@2 as ps_supplycost]
                HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(ps_suppkey@2, s_suppkey@0)], projection=[p_partkey@0, p_mfgr@1, ps_supplycost@3, s_name@5, s_address@6, s_nationkey@7, s_phone@8, s_acctbal@9, s_comment@10]
                  ExchangeExec: partitioning=None, plan_id=4, stage_id=3, stage_resolved=true, broadcast=true
                    HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p_partkey@0, ps_partkey@0)], projection=[p_partkey@0, p_mfgr@1, ps_suppkey@3, ps_supplycost@4]
                      ExchangeExec: partitioning=None, plan_id=0, stage_id=0, stage_resolved=true, broadcast=true
                        FilterExec: p_size@3 = 15 AND p_type@2 LIKE %BRASS, projection=[p_partkey@0, p_mfgr@1]
                          DataSourceExec: file_groups={7 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-0.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-1.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-10.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-11.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-12.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-13.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-2.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-3.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-4.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-5.parquet], ...]}, projection=[p_partkey, p_mfgr, p_type, p_size], file_type=parquet, predicate=p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS, pruning_predicate=p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1, required_guarantees=[p_size in (15)]
                      DataSourceExec: file_groups={8 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-0.parquet:0..2385925, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:0..1749648], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:1749648..2367980, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-10.parquet:0..2354492, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:0..1162749], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:1162749..2356504, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-12.parquet:0..2355805, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:0..586013], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:586013..2352367, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:0..2369219], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:2369219..2369944, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-3.parquet:0..2369786, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-4.parquet:0..1765062], ...]}, projection=[ps_partkey, ps_suppkey, ps_supplycost], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ]
                  DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/supplier/part-0.parquet]]}, projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        ExchangeExec: partitioning=None, plan_id=5, stage_id=5, stage_resolved=true, broadcast=true
          ProjectionExec: expr=[min(partsupp.ps_supplycost)@1 as min(partsupp.ps_supplycost), ps_partkey@0 as ps_partkey]
            AggregateExec: mode=FinalPartitioned, gby=[ps_partkey@0 as ps_partkey], aggr=[min(partsupp.ps_supplycost)]
              ExchangeExec: partitioning=Hash([ps_partkey@0], 8), plan_id=6, stage_id=4, stage_resolved=true
                AggregateExec: mode=Partial, gby=[ps_partkey@0 as ps_partkey], aggr=[min(partsupp.ps_supplycost)]
                  HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(r_regionkey@0, n_regionkey@2)], projection=[ps_partkey@1, ps_supplycost@2]
                    FilterExec: r_name@1 = EUROPE, projection=[r_regionkey@0]
                      DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/region/part-0.parquet]]}, projection=[r_regionkey, r_name], file_type=parquet, predicate=r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE, pruning_predicate=r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1, required_guarantees=[r_name in (EUROPE)]
                    ProjectionExec: expr=[ps_partkey@1 as ps_partkey, ps_supplycost@2 as ps_supplycost, n_regionkey@0 as n_regionkey]
                      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(n_nationkey@0, s_nationkey@2)], projection=[n_regionkey@1, ps_partkey@2, ps_supplycost@3]
                        DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/nation/part-0.parquet]]}, projection=[n_nationkey, n_regionkey], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ]
                        ProjectionExec: expr=[ps_partkey@1 as ps_partkey, ps_supplycost@2 as ps_supplycost, s_nationkey@0 as s_nationkey]
                          HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_suppkey@0, ps_suppkey@1)], projection=[s_nationkey@1, ps_partkey@2, ps_supplycost@4]
                            ExchangeExec: partitioning=Hash([s_suppkey@0], 8), plan_id=2, stage_id=2, stage_resolved=true
                              DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/supplier/part-0.parquet]]}, projection=[s_suppkey, s_nationkey], file_type=parquet
                            ExchangeExec: partitioning=Hash([ps_suppkey@1], 8), plan_id=1, stage_id=1, stage_resolved=true
                              DataSourceExec: file_groups={8 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-0.parquet:0..2385925, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:0..1749648], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:1749648..2367980, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-10.parquet:0..2354492, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:0..1162749], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:1162749..2356504, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-12.parquet:0..2355805, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:0..586013], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:586013..2352367, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:0..2369219], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:2369219..2369944, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-3.parquet:0..2369786, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-4.parquet:0..1765062], ...]}, projection=[ps_partkey, ps_suppkey, ps_supplycost], file_type=parquet

@milenkovicm
Copy link
Copy Markdown
Contributor Author

milenkovicm commented May 24, 2026

Partitions get messed up in the last stage

@milenkovicm
Copy link
Copy Markdown
Contributor Author

Note to myself: check #1643

#1643 mentions pushing down

FilterExec: r_name@1 = EUROPE, projection=[r_regionkey@0]

And doing broadcast join, this is true for aqe implementation

@milenkovicm milenkovicm force-pushed the feat_dynamic_join_selection branch from 4a69e05 to dbfacfa Compare May 25, 2026 13:21
@milenkovicm milenkovicm force-pushed the feat_dynamic_join_selection branch from dbfacfa to 6335c37 Compare May 25, 2026 15:09
@milenkovicm milenkovicm force-pushed the feat_dynamic_join_selection branch from 6335c37 to d803f06 Compare May 25, 2026 16:57
@milenkovicm milenkovicm changed the title WIP feat(aqe): delay join decision & introduce broadcast join in AQE [WIP] feat(aqe): delay join decision & introduce broadcast join in AQE May 25, 2026
Comment thread ballista/scheduler/src/state/aqe/execution_plan/dynamic_join.rs
Comment thread ballista/scheduler/src/state/aqe/mod.rs Outdated
Comment thread ballista/scheduler/src/state/aqe/optimizer_rule/join_selection.rs Outdated
Comment thread ballista/core/src/extension.rs Outdated
.set_u64(
"datafusion.optimizer.hash_join_single_partition_threshold",
0,
10 * 1024 * 1024,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert values before merging

Comment thread ballista/scheduler/src/state/aqe/execution_plan/dynamic_join.rs Outdated
Comment thread ballista/scheduler/src/state/aqe/execution_plan/dynamic_join.rs
Comment thread ballista/scheduler/src/state/aqe/execution_plan/dynamic_join.rs Outdated
/// Stage IDs are incremental and unique for each job.
stage_id_generator: usize,
/// plan id generator
#[allow(dead_code)] // TODO: keep it for now until we refactor it
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it maybe?

Comment thread ballista/scheduler/src/state/execution_stage.rs Outdated
@milenkovicm
Copy link
Copy Markdown
Contributor Author

Q72 (SF1) Fails

AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending, stage_resolved=false
  SortPreservingMergeExec: [total_cnt@5 DESC, i_item_desc@0 ASC NULLS LAST, w_warehouse_name@1 ASC NULLS LAST, d_week_seq@2 ASC NULLS LAST], fetch=100
    SortExec: TopK(fetch=100), expr=[total_cnt@5 DESC, i_item_desc@0 ASC NULLS LAST, w_warehouse_name@1 ASC NULLS LAST, d_week_seq@2 ASC NULLS LAST], preserve_partitioning=[true]
      ProjectionExec: expr=[i_item_desc@0 as i_item_desc, w_warehouse_name@1 as w_warehouse_name, d_week_seq@2 as d_week_seq, sum(CASE WHEN promotion.p_promo_sk IS NULL THEN Int64(1) ELSE Int64(0) END)@3 as no_promo, sum(CASE WHEN promotion.p_promo_sk IS NOT NULL THEN Int64(1) ELSE Int64(0) END)@4 as promo, count(Int64(1))@5 as total_cnt]
        AggregateExec: mode=FinalPartitioned, gby=[i_item_desc@0 as i_item_desc, w_warehouse_name@1 as w_warehouse_name, d_week_seq@2 as d_week_seq], aggr=[sum(CASE WHEN promotion.p_promo_sk IS NULL THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN promotion.p_promo_sk IS NOT NULL THEN Int64(1) ELSE Int64(0) END), count(Int64(1))]
          ExchangeExec: partitioning=Hash([i_item_desc@0, w_warehouse_name@1, d_week_seq@2], 14), plan_id=4, stage_id=3, stage_resolved=false
            AggregateExec: mode=Partial, gby=[i_item_desc@1 as i_item_desc, w_warehouse_name@0 as w_warehouse_name, d_week_seq@2 as d_week_seq], aggr=[sum(CASE WHEN promotion.p_promo_sk IS NULL THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN promotion.p_promo_sk IS NOT NULL THEN Int64(1) ELSE Int64(0) END), count(Int64(1))]
              HashJoinExec: mode=CollectLeft, join_type=Left, on=[(cs_item_sk@0, cr_item_sk@0), (cs_order_number@1, cr_order_number@1)], projection=[w_warehouse_name@2, i_item_desc@3, d_week_seq@4, p_promo_sk@5]
                ProjectionExec: expr=[cs_item_sk@1 as cs_item_sk, cs_order_number@2 as cs_order_number, w_warehouse_name@3 as w_warehouse_name, i_item_desc@4 as i_item_desc, d_week_seq@5 as d_week_seq, p_promo_sk@0 as p_promo_sk]
                  HashJoinExec: mode=CollectLeft, join_type=Right, on=[(p_promo_sk@0, cs_promo_sk@1)], projection=[p_promo_sk@0, cs_item_sk@1, cs_order_number@3, w_warehouse_name@4, i_item_desc@5, d_week_seq@6]
                    DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/promotion.parquet]]}, projection=[p_promo_sk], file_type=parquet
                    HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(cs_ship_date_sk@0, d_date_sk@0)], filter=d_date@1 > d_date@0 + IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }, projection=[cs_item_sk@1, cs_promo_sk@2, cs_order_number@3, w_warehouse_name@4, i_item_desc@5, d_week_seq@7]
                      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(inv_date_sk@4, d_date_sk@0), (d_week_seq@8, d_week_seq@1)], projection=[cs_ship_date_sk@0, cs_item_sk@1, cs_promo_sk@2, cs_order_number@3, w_warehouse_name@5, i_item_desc@6, d_date@7, d_week_seq@8]
                        ProjectionExec: expr=[cs_ship_date_sk@2 as cs_ship_date_sk, cs_item_sk@3 as cs_item_sk, cs_promo_sk@4 as cs_promo_sk, cs_order_number@5 as cs_order_number, inv_date_sk@6 as inv_date_sk, w_warehouse_name@7 as w_warehouse_name, i_item_desc@8 as i_item_desc, d_date@0 as d_date, d_week_seq@1 as d_week_seq]
                          HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_date_sk@0, cs_sold_date_sk@0)], projection=[d_date@1, d_week_seq@2, cs_ship_date_sk@4, cs_item_sk@5, cs_promo_sk@6, cs_order_number@7, inv_date_sk@8, w_warehouse_name@9, i_item_desc@10]
                            FilterExec: d_year@3 = 1999, projection=[d_date_sk@0, d_date@1, d_week_seq@2]
                              DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/date_dim.parquet]]}, projection=[d_date_sk, d_date, d_week_seq, d_year], file_type=parquet, predicate=d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1, required_guarantees=[d_year in (1999)]
                            HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(hd_demo_sk@0, cs_bill_hdemo_sk@2)], projection=[cs_sold_date_sk@1, cs_ship_date_sk@2, cs_item_sk@4, cs_promo_sk@5, cs_order_number@6, inv_date_sk@7, w_warehouse_name@8, i_item_desc@9]
                              FilterExec: hd_buy_potential@1 = 501-1000, projection=[hd_demo_sk@0]
                                DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/household_demographics.parquet]]}, projection=[hd_demo_sk, hd_buy_potential], file_type=parquet, predicate=hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000, pruning_predicate=hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1, required_guarantees=[hd_buy_potential in (501-1000)]
                              HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(cs_bill_cdemo_sk@2, cd_demo_sk@0)], projection=[cs_sold_date_sk@0, cs_ship_date_sk@1, cs_bill_hdemo_sk@3, cs_item_sk@4, cs_promo_sk@5, cs_order_number@6, inv_date_sk@7, w_warehouse_name@8, i_item_desc@9]
                                HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(i_item_sk@0, cs_item_sk@4)], projection=[cs_sold_date_sk@2, cs_ship_date_sk@3, cs_bill_cdemo_sk@4, cs_bill_hdemo_sk@5, cs_item_sk@6, cs_promo_sk@7, cs_order_number@8, inv_date_sk@9, w_warehouse_name@10, i_item_desc@1]
                                  DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/item.parquet]]}, projection=[i_item_sk, i_item_desc], file_type=parquet, predicate=DynamicFilter [ empty ]
                                  ExchangeExec: partitioning=None, plan_id=3, stage_id=2, stage_resolved=true, broadcast=true
                                    ProjectionExec: expr=[cs_sold_date_sk@1 as cs_sold_date_sk, cs_ship_date_sk@2 as cs_ship_date_sk, cs_bill_cdemo_sk@3 as cs_bill_cdemo_sk, cs_bill_hdemo_sk@4 as cs_bill_hdemo_sk, cs_item_sk@5 as cs_item_sk, cs_promo_sk@6 as cs_promo_sk, cs_order_number@7 as cs_order_number, inv_date_sk@8 as inv_date_sk, w_warehouse_name@0 as w_warehouse_name]
                                      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(w_warehouse_sk@0, inv_warehouse_sk@8)], projection=[w_warehouse_name@1, cs_sold_date_sk@2, cs_ship_date_sk@3, cs_bill_cdemo_sk@4, cs_bill_hdemo_sk@5, cs_item_sk@6, cs_promo_sk@7, cs_order_number@8, inv_date_sk@9]
                                        DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/warehouse.parquet]]}, projection=[w_warehouse_sk, w_warehouse_name], file_type=parquet
                                        ProjectionExec: expr=[cs_sold_date_sk@0 as cs_sold_date_sk, cs_ship_date_sk@1 as cs_ship_date_sk, cs_bill_cdemo_sk@2 as cs_bill_cdemo_sk, cs_bill_hdemo_sk@3 as cs_bill_hdemo_sk, cs_item_sk@4 as cs_item_sk, cs_promo_sk@5 as cs_promo_sk, cs_order_number@6 as cs_order_number, inv_date_sk@8 as inv_date_sk, inv_warehouse_sk@10 as inv_warehouse_sk]
                                          SortMergeJoinExec: join_type=Inner, on=[(cs_item_sk@4, inv_item_sk@1)], filter=inv_quantity_on_hand@1 < cs_quantity@0
                                            SortExec: expr=[cs_item_sk@4 ASC], preserve_partitioning=[true]
                                              ExchangeExec: partitioning=Hash([cs_item_sk@4], 14), plan_id=0, stage_id=0, stage_resolved=true
                                                DataSourceExec: file_groups={14 groups: [[Users/marko/TMP/tpcds_data/1g_parquet/catalog_sales.parquet:0..7244714], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_sales.parquet:7244714..14489428], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_sales.parquet:14489428..21734142], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_sales.parquet:21734142..28978856], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_sales.parquet:28978856..36223570], ...]}, projection=[cs_sold_date_sk, cs_ship_date_sk, cs_bill_cdemo_sk, cs_bill_hdemo_sk, cs_item_sk, cs_promo_sk, cs_order_number, cs_quantity], file_type=parquet
                                            SortExec: expr=[inv_item_sk@1 ASC], preserve_partitioning=[true]
                                              ExchangeExec: partitioning=Hash([inv_item_sk@1], 14), plan_id=1, stage_id=1, stage_resolved=true
                                                DataSourceExec: file_groups={14 groups: [[Users/marko/TMP/tpcds_data/1g_parquet/inventory.parquet:0..2725031], [Users/marko/TMP/tpcds_data/1g_parquet/inventory.parquet:2725031..5450062], [Users/marko/TMP/tpcds_data/1g_parquet/inventory.parquet:5450062..8175093], [Users/marko/TMP/tpcds_data/1g_parquet/inventory.parquet:8175093..10900124], [Users/marko/TMP/tpcds_data/1g_parquet/inventory.parquet:10900124..13625155], ...]}, projection=[inv_date_sk, inv_item_sk, inv_warehouse_sk, inv_quantity_on_hand], file_type=parquet
                                FilterExec: cd_marital_status@1 = S, projection=[cd_demo_sk@0]
                                  DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/customer_demographics.parquet]]}, projection=[cd_demo_sk, cd_marital_status], file_type=parquet, predicate=cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND DynamicFilter [ empty ], pruning_predicate=cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1, required_guarantees=[cd_marital_status in (S)]
                        DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/date_dim.parquet]]}, projection=[d_date_sk, d_week_seq], file_type=parquet, predicate=DynamicFilter [ empty ]
                      DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/date_dim.parquet]]}, projection=[d_date_sk, d_date], file_type=parquet, predicate=DynamicFilter [ empty ]
                DataSourceExec: file_groups={14 groups: [[Users/marko/TMP/tpcds_data/1g_parquet/catalog_returns.parquet:0..806517], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_returns.parquet:806517..1613034], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_returns.parquet:1613034..2419551], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_returns.parquet:2419551..3226068], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_returns.parquet:3226068..4032585], ...]}, projection=[cr_item_sk, cr_order_number], file_type=parquet

stage 3 fails with (all running tasks): DataFusionError(Shared(Shared(Shared(Shared(ArrowError(OffsetOverflowError(2147506937), Some(\"\")))))))\n")), None)

SortShuffleWriterExec: partitioning=Hash([i_item_desc@0, w_warehouse_name@1, d_week_seq@2], 14)
  AggregateExec: mode=Partial, gby=[i_item_desc@1 as i_item_desc, w_warehouse_name@0 as w_warehouse_name, d_week_seq@2 as d_week_seq], aggr=[sum(CASE WHEN promotion.p_promo_sk IS NULL THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN promotion.p_promo_sk IS NOT NULL THEN Int64(1) ELSE Int64(0) END), count(Int64(1))]
    HashJoinExec: mode=CollectLeft, join_type=Left, on=[(cs_item_sk@0, cr_item_sk@0), (cs_order_number@1, cr_order_number@1)], projection=[w_warehouse_name@2, i_item_desc@3, d_week_seq@4, p_promo_sk@5]
      ProjectionExec: expr=[cs_item_sk@1 as cs_item_sk, cs_order_number@2 as cs_order_number, w_warehouse_name@3 as w_warehouse_name, i_item_desc@4 as i_item_desc, d_week_seq@5 as d_week_seq, p_promo_sk@0 as p_promo_sk]
        HashJoinExec: mode=CollectLeft, join_type=Right, on=[(p_promo_sk@0, cs_promo_sk@1)], projection=[p_promo_sk@0, cs_item_sk@1, cs_order_number@3, w_warehouse_name@4, i_item_desc@5, d_week_seq@6]
          DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/promotion.parquet]]}, projection=[p_promo_sk], file_type=parquet
          HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(cs_ship_date_sk@0, d_date_sk@0)], filter=d_date@1 > d_date@0 + IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }, projection=[cs_item_sk@1, cs_promo_sk@2, cs_order_number@3, w_warehouse_name@4, i_item_desc@5, d_week_seq@7]
            HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(inv_date_sk@4, d_date_sk@0), (d_week_seq@8, d_week_seq@1)], projection=[cs_ship_date_sk@0, cs_item_sk@1, cs_promo_sk@2, cs_order_number@3, w_warehouse_name@5, i_item_desc@6, d_date@7, d_week_seq@8]
              ProjectionExec: expr=[cs_ship_date_sk@2 as cs_ship_date_sk, cs_item_sk@3 as cs_item_sk, cs_promo_sk@4 as cs_promo_sk, cs_order_number@5 as cs_order_number, inv_date_sk@6 as inv_date_sk, w_warehouse_name@7 as w_warehouse_name, i_item_desc@8 as i_item_desc, d_date@0 as d_date, d_week_seq@1 as d_week_seq]
                HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_date_sk@0, cs_sold_date_sk@0)], projection=[d_date@1, d_week_seq@2, cs_ship_date_sk@4, cs_item_sk@5, cs_promo_sk@6, cs_order_number@7, inv_date_sk@8, w_warehouse_name@9, i_item_desc@10]
                  FilterExec: d_year@3 = 1999, projection=[d_date_sk@0, d_date@1, d_week_seq@2]
                    DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/date_dim.parquet]]}, projection=[d_date_sk, d_date, d_week_seq, d_year], file_type=parquet, predicate=d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1, required_guarantees=[d_year in (1999)]
                  HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(hd_demo_sk@0, cs_bill_hdemo_sk@2)], projection=[cs_sold_date_sk@1, cs_ship_date_sk@2, cs_item_sk@4, cs_promo_sk@5, cs_order_number@6, inv_date_sk@7, w_warehouse_name@8, i_item_desc@9]
                    FilterExec: hd_buy_potential@1 = 501-1000, projection=[hd_demo_sk@0]
                      DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/household_demographics.parquet]]}, projection=[hd_demo_sk, hd_buy_potential], file_type=parquet, predicate=hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000, pruning_predicate=hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1, required_guarantees=[hd_buy_potential in (501-1000)]
                    HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(cs_bill_cdemo_sk@2, cd_demo_sk@0)], projection=[cs_sold_date_sk@0, cs_ship_date_sk@1, cs_bill_hdemo_sk@3, cs_item_sk@4, cs_promo_sk@5, cs_order_number@6, inv_date_sk@7, w_warehouse_name@8, i_item_desc@9]
                      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(i_item_sk@0, cs_item_sk@4)], projection=[cs_sold_date_sk@2, cs_ship_date_sk@3, cs_bill_cdemo_sk@4, cs_bill_hdemo_sk@5, cs_item_sk@6, cs_promo_sk@7, cs_order_number@8, inv_date_sk@9, w_warehouse_name@10, i_item_desc@1]
                        DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/item.parquet]]}, projection=[i_item_sk, i_item_desc], file_type=parquet, predicate=DynamicFilter [ empty ]
                        ShuffleReaderExec: upstream_stage: 2, broadcast: true, upstream_partition_count: 14
                      FilterExec: cd_marital_status@1 = S, projection=[cd_demo_sk@0]
                        DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/customer_demographics.parquet]]}, projection=[cd_demo_sk, cd_marital_status], file_type=parquet, predicate=cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND DynamicFilter [ empty ], pruning_predicate=cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1, required_guarantees=[cd_marital_status in (S)]
              DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/date_dim.parquet]]}, projection=[d_date_sk, d_week_seq], file_type=parquet, predicate=DynamicFilter [ empty ]
            DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/1g_parquet/date_dim.parquet]]}, projection=[d_date_sk, d_date], file_type=parquet, predicate=DynamicFilter [ empty ]
      DataSourceExec: file_groups={14 groups: [[Users/marko/TMP/tpcds_data/1g_parquet/catalog_returns.parquet:0..806517], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_returns.parquet:806517..1613034], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_returns.parquet:1613034..2419551], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_returns.parquet:2419551..3226068], [Users/marko/TMP/tpcds_data/1g_parquet/catalog_returns.parquet:3226068..4032585], ...]}, projection=[cr_item_sk, cr_order_number], file_type=parquet

@milenkovicm
Copy link
Copy Markdown
Contributor Author

TPCH (SF10) AQE Enabled

Query 1 took 0.990 s and returned 4 rows
Query 2 took 0.514 s and returned 100 rows
Query 3 took 1.196 s and returned 10 rows
Query 4 took 0.775 s and returned 5 rows
Query 5 took 3.186 s and returned 5 rows
Query 6 took 0.261 s and returned 1 rows
Query 7 took 2.256 s and returned 4 rows
Query 8 took 0.722 s and returned 2 rows
Query 9 took 1.493 s and returned 168 rows
Query 10 took 1.246 s and returned 20 rows
Query 11 took 0.622 s and returned 0 rows
Query 12 took 0.821 s and returned 2 rows
Query 13 took 1.334 s and returned 46 rows
Query 14 took 0.401 s and returned 1 rows
Query 15 took 0.435 s and returned 0 rows
Query 16 took 0.320 s and returned 27840 rows
Query 17 took 3.115 s and returned 1 rows
Query 18 took 5.478 s and returned 100 rows
Query 19 took 0.831 s and returned 1 rows
Query 20 took 0.764 s and returned 1804 rows
Query 21 took 3.321 s and returned 100 rows
Query 22 took 0.549 s and returned 7 rows
Total time: 30.632 s
image

TPCH (SF10) AQE Disabled

Query 1 took 1.002 s and returned 4 rows
Query 2 took 1.096 s and returned 100 rows
Query 3 took 1.481 s and returned 10 rows
Query 4 took 0.764 s and returned 5 rows
Query 5 took 3.157 s and returned 5 rows
Query 6 took 0.267 s and returned 1 rows
Query 7 took 4.247 s and returned 4 rows
Query 8 took 4.588 s and returned 2 rows
Query 9 took 6.123 s and returned 168 rows
Query 10 took 1.326 s and returned 20 rows
Query 11 took 0.861 s and returned 0 rows
Query 12 took 0.992 s and returned 2 rows
Query 13 took 1.264 s and returned 46 rows
Query 14 took 0.423 s and returned 1 rows
Query 15 took 0.439 s and returned 0 rows
Query 16 took 0.314 s and returned 27840 rows
Query 17 took 4.774 s and returned 1 rows
Query 18 took 3.708 s and returned 100 rows
Query 19 took 0.859 s and returned 1 rows
Query 20 took 0.864 s and returned 1804 rows
Query 21 took 7.489 s and returned 100 rows
Query 22 took 0.534 s and returned 7 rows
Total time: 46.572 s
image

@milenkovicm milenkovicm requested a review from Dandandan May 27, 2026 18:53
@milenkovicm milenkovicm changed the title [WIP] feat(aqe): delay join decision & introduce broadcast join in AQE feat(aqe): delay join decision & introduce broadcast join in AQE May 27, 2026
@milenkovicm milenkovicm marked this pull request as ready for review May 27, 2026 18:58
@milenkovicm milenkovicm requested a review from martin-g May 27, 2026 18:59

Ok(Self {
stage_id_generator: 0,
stage_id_generator: 0, // FIXME: compatibility issue with static where stages start from 1
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow up once this get merged

pub enum JoinSelectionAction {
Repartition(Arc<DynamicJoinSelectionExec>),
CollectLeft(Arc<HashJoinExec>),
LateCollectLeft(Arc<HashJoinExec>),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This action gets triggered when both sides get repartitioned, does it make sense to change from partitioned to collect left join at that point ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant