feat(aqe): port Spark's OptimizeSkewedJoin#1779
Draft
wirybeaver wants to merge 4 commits into
Draft
Conversation
Lays the foundation for porting Spark's OptimizeSkewedJoin AQE rule. This
commit adds only data — no rule, no plan-tree integration, no behavior
change. Subsequent commits wire the carrier into ShuffleReaderExec / the
adapter (C2) and add the rule itself (C3).
- `BallistaConfig` knobs (all opt-in via `skew_join.enabled=false`):
- `skew_join.skewed_partition_factor` (5.0) — Spark's
`skewedPartitionFactor`
- `skew_join.skewed_partition_threshold_bytes` (256 MiB) — Spark's
`skewedPartitionThresholdInBytes`
- `skew_join.advisory_partition_bytes` (64 MiB) — target sub-shard size
for the bin-packer; decoupled from `coalesce.target_partition_bytes`
so the two rules tune independently
- `skew_join.small_partition_factor` (0.2) — tail-merge factor from
Spark's `splitSizeListByTargetSize` legacy
- `SessionConfigExt` accessor pairs for each knob (mirrors the existing
coalesce-config accessor pattern).
- `SkewJoinPlan` / `SkewJoinShard` carrier types in shuffle_reader.rs,
mirroring `CoalescePlan` / `PartitionGroup`. `SkewJoinShard` encodes
Spark's `PartialReducerPartitionSpec(reducerIdx, startMapIdx,
endMapIdx)` semantics. Plan attaches to both legs of a join's
alignment group; `shards.len() == K'` matches across legs so the
planner builds matched join inputs by zipping the two legs.
`cargo check --workspace` clean.
…rExec / adapter
Adds the plumbing layer that lets a future rule attach a per-stage skew-join
decision and have the adapter materialize it into a shuffle reader with
per-mapper sub-range slicing. Still no rule and no production behavior
change — `set_skew_join` is only called by the new unit tests.
- `ExchangeExec`: new `skew_join` slot (mirror of the existing `coalesce`
slot). `set_skew_join` / `skew_join()` accessors, threaded through
`with_new_children`, and rendered in EXPLAIN as `skew_join=K' of M`.
- `ShuffleReaderExec`: new `skew_join: Option<SkewJoinPlan>` field plus a
`try_new_skew_join` constructor mirroring `try_new_coalesced`. Debug
asserts on K'-shape invariants. Threaded through `with_work_dir`,
`with_client_pool`, `with_new_children`, and EXPLAIN.
- `BallistaAdapter::transform_children`: extended from 2-arm to 4-arm
match on `(coalesce, skew_join)`:
- `(Some, Some)` → exec_err — mutually exclusive by construction
- `(Some, None)` → existing coalesce path
- `(None, Some(sp))` → slice each upstream's `Vec<PartitionLocation>`
to the shard's `[start_map_idx, end_map_idx)` window, preserve hash
partitioning width at K' (the eventual `is_skew_join` flag on the
join op in C4 is what relaxes "same key in one partition")
- `(None, None)` → unchanged
Tests in `adapter.rs::tests`:
- `adapter_slices_skew_join_shards_by_map_range`: builds a 3×4 synthetic
upstream, splits upstream 0 two-way, upstream 1 four-way, leaves
upstream 2 as passthrough; asserts K'=7 with the exact
`(upstream_partition_id, map_partition_id)` pairs per output shard.
- `adapter_errors_when_both_coalesce_and_skew_join_set`: regression
guard for the mutual-exclusion invariant.
- `adapter_guards_out_of_bounds_skew_join_indices`: out-of-bounds
`upstream_idx` errors clearly; `end_map_idx > inner.len()` clamps;
`start >= end` yields an empty shard.
`cargo check --workspace` clean; full AQE suite (52 tests) passes.
…te matching side Ports Spark's OptimizeSkewedJoin into Ballista's AQE pipeline. When one side of a binary join has a partition whose bytes exceed BOTH factor * median(per-side sizes) AND an absolute threshold, the rule splits that side's per-mapper byte vector into contiguous [start_map_idx, end_map_idx) ranges (bin-packed near advisory_partition_bytes via the coalesce module's split_size_list_by_target_size helper) and cartesian-pairs them against the other side's ranges. Both sides' resulting paired SkewJoinPlans land on the two leaf ExchangeExecs via the carrier slots from C2; the adapter consumes them to build K'-partition ShuffleReaderExecs with per-mapper sub-range slicing. Apply Spark's per-join-type split-side allowlist (Inner: both, Left/LeftSemi/ LeftAnti/LeftMark: left only, Right: right only, Full: neither). v1 handles exactly one binary join per stage subtree; multi-join stages bail (Spark iterates per-join, a follow-up can do the same). Default off via ballista.planner.skew_join.enabled. Wired after CoalescePartitionsRule in planner.rs::actionable_stages() — the two carriers are mutually exclusive, so running coalesce first lets skew-join idempotently short-circuit on already-claimed leaves. Algorithm helpers (skew_join/algorithm.rs): is_skewed dual-guard detection, map_ranges_for_upstream bin-packer (reuses coalesce's helper), pair_shards cartesian product, robust_median. 11 unit tests cover detection edges, bin-pack output, and the four cartesian shapes (split×split, split×passthrough, passthrough×split, passthrough×passthrough). Integration tests (state/aqe/test/skew_join_rule.rs): 6 end-to-end cases through AdaptivePlanner — single-side skew on SMJ and HashJoin(Partitioned), disabled, below-threshold guard, uniform-bytes guard, and a final-stage shuffle-reader check that K'=7 partitions flow through the adapter into the runnable plan. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…ynamic-filter mutual exclusion After investigating whether DataFusion's SortMergeJoinExec needs an is_skew_join flag analogous to Spark's, the answer is: no flag is needed on the join itself (DF's SMJ has no global per-key invariant the rewrite violates). But two correctness gaps in the adapter / rule pipeline did surface, and this commit closes both. B-fix (adapter, always-on): the skew_join adapter arm now returns Partitioning::UnknownPartitioning(K') instead of Hash(keys, K'). Splitting a hash bucket across multiple downstream partitions means the same key now lives in multiple partitions, so the data is not truly hash-partitioned at K' — claiming Hash would mislead downstream operators (other joins, hash aggregates, DataFusion's per-partition dynamic-filter routing) into trust they shouldn't have. UnknownPartitioning is the honest declaration; within the current stage the join above still works because each task receives a properly-paired (left-shard, right-shard) bundle. D-fix (rule, mutual exclusion): DataFusion 53.1's HashJoin dynamic filter pushdown builds a CASE expression keyed on hash(join_keys) % K' that routes each probe row to one partition's bounds. The skew rewrite intentionally violates that hash-co-location invariant, so the routed CASE would filter out probe rows whose matches live in a different partition → silent wrong results. The default for optimizer.enable_join_dynamic_filter_pushdown in DF 53.1 is true, so this is a today-problem, not future-proofing. Picked mutual exclusion (option 3 of 4) for v1: when the DF option is on, the rule refuses to fire with a clear log line. Users opt into one or the other, not both. Alternatives — documentation only (rejected, DF default is on), plan-mutation to clear dynamic filters (more invasive), and upstream DF change to make per-partition CASE skew-aware (tracked at ~/mydocs/datafusion/aqe-tasks/11-skew-compatible-hash-join-dynamic-filter.md as a follow-up) — are all documented in the rule's source comment. Tests: existing five fire/non-fire skew_join tests pre-set enable_join_dynamic_filter_pushdown=false (via a comment in skew_join_context); one new test should_bail_when_dynamic_filter_pushdown_enabled covers the mutual-exclusion bail path. The shuffle-reader integration test now expects UnknownPartitioning(7) instead of Hash([c@1], 7), with a docstring explaining why the rewrite must declare unknown partitioning. All 70 AQE tests pass (63 prior + 6 from C3 + 1 new in C4). Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Member
|
@wirybeaver Please resolve the conflicts! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Ports Spark's
OptimizeSkewedJoinAQE rule into Ballista. Replaces the previously-attempted file-list sharding approach (closed in #1718) with Spark's split-and-replicate pattern, which directly addresses the highest-value skew shape (skewed joins) and is correct by construction without needing aggregate-aware rewrites or salting.Draft — landing incrementally as a stack of commits on this single PR:
BallistaConfigknobs +SessionConfigExtaccessors +SkewJoinPlan/SkewJoinShardcarrier types. No rule, no behavior change.ShuffleReaderExec::try_new_skew_join+ adapter wiring.OptimizeSkewedJoinRule— detection, bin-pack split, cartesian replication, join-type allowlist, bilateral attach.is_skew_joinflag forSortMergeJoinExec(may require an upstream DataFusion patch).Design
Concept mapping (full design in PR description as it evolves):
OptimizeSkewedJoinOptimizeSkewedJoinRuleinstate/aqe/optimizer_rule/MapOutputStatisticsExchangeExec::shuffle_partitions()+PartitionLocation::partition_stats.num_bytes()PartialReducerPartitionSpec(reducerIdx, [startMap, endMap), size)SkewJoinShard { upstream_idx, start_map_idx, end_map_idx }Vec<SkewJoinShard>of lengthK'on both legsisSkewJoin = trueflagSortMergeJoinExecDefaults match Spark; rule is opt-in via
ballista.planner.skew_join.enabled=false.Why split+replicate over file-list sharding (the closed approach)
File-list sharding (#1718) had to bail on any
HashPartitioned/SinglePartitionconsumer — joins,FinalPartitionedaggregates, global limits — because it scattered rows that the consumer assumed colocated. That covered most realistic workloads. Spark's split+replicate pattern handles the most common skew shape (joins) without that limitation: replicating the non-skewed side N times preserves the join's correctness while distributing the skewed side's work across N tasks.Test plan
cargo check --workspacepasses (done locally).(upstream_idx, map_range)tuples correctly.Closes #1718 in favor of this approach.