Skip to content

feat(aqe): port Spark's OptimizeSkewedJoin#1779

Draft
wirybeaver wants to merge 4 commits into
apache:mainfrom
wirybeaver:optimize-skewed-join
Draft

feat(aqe): port Spark's OptimizeSkewedJoin#1779
wirybeaver wants to merge 4 commits into
apache:mainfrom
wirybeaver:optimize-skewed-join

Conversation

@wirybeaver
Copy link
Copy Markdown

Summary

Ports Spark's OptimizeSkewedJoin AQE rule into Ballista. Replaces the previously-attempted file-list sharding approach (closed in #1718) with Spark's split-and-replicate pattern, which directly addresses the highest-value skew shape (skewed joins) and is correct by construction without needing aggregate-aware rewrites or salting.

Draft — landing incrementally as a stack of commits on this single PR:

  • C1 (this commit): BallistaConfig knobs + SessionConfigExt accessors + SkewJoinPlan / SkewJoinShard carrier types. No rule, no behavior change.
  • C2: ShuffleReaderExec::try_new_skew_join + adapter wiring.
  • C3: OptimizeSkewedJoinRule — detection, bin-pack split, cartesian replication, join-type allowlist, bilateral attach.
  • C4: is_skew_join flag for SortMergeJoinExec (may require an upstream DataFusion patch).
  • C5: TPC-H end-to-end synthetic-skew test.

Design

Concept mapping (full design in PR description as it evolves):

Spark Ballista equivalent
OptimizeSkewedJoin OptimizeSkewedJoinRule in state/aqe/optimizer_rule/
MapOutputStatistics ExchangeExec::shuffle_partitions() + PartitionLocation::partition_stats.num_bytes()
PartialReducerPartitionSpec(reducerIdx, [startMap, endMap), size) SkewJoinShard { upstream_idx, start_map_idx, end_map_idx }
Cartesian pair-up across legs Pre-computed Vec<SkewJoinShard> of length K' on both legs
isSkewJoin = true flag TBD — may need a flag on SortMergeJoinExec

Defaults match Spark; rule is opt-in via ballista.planner.skew_join.enabled=false.

Why split+replicate over file-list sharding (the closed approach)

File-list sharding (#1718) had to bail on any HashPartitioned / SinglePartition consumer — joins, FinalPartitioned aggregates, global limits — because it scattered rows that the consumer assumed colocated. That covered most realistic workloads. Spark's split+replicate pattern handles the most common skew shape (joins) without that limitation: replicating the non-skewed side N times preserves the join's correctness while distributing the skewed side's work across N tasks.

Test plan

  • C1: cargo check --workspace passes (done locally).
  • C2: synthetic adapter test — reader serves paired (upstream_idx, map_range) tuples correctly.
  • C3: rule unit tests (detection edge cases, bin-pack, allowlist, bilateral attach); SQL-level integration tests with synthetic skewed PartitionLocation stats.
  • C4: integration test with skewed sort-merge join.
  • C5: TPC-H end-to-end ≥30% wall-clock improvement on a synthetically-skewed join key vs rule disabled; row-identical results.
  • Regression: existing coalesce tests stay green; full TPC-H suite produces row-identical results with rule on vs off.

Closes #1718 in favor of this approach.

wirybeaver and others added 4 commits May 26, 2026 22:43
Lays the foundation for porting Spark's OptimizeSkewedJoin AQE rule. This
commit adds only data — no rule, no plan-tree integration, no behavior
change. Subsequent commits wire the carrier into ShuffleReaderExec / the
adapter (C2) and add the rule itself (C3).

- `BallistaConfig` knobs (all opt-in via `skew_join.enabled=false`):
  - `skew_join.skewed_partition_factor` (5.0) — Spark's
    `skewedPartitionFactor`
  - `skew_join.skewed_partition_threshold_bytes` (256 MiB) — Spark's
    `skewedPartitionThresholdInBytes`
  - `skew_join.advisory_partition_bytes` (64 MiB) — target sub-shard size
    for the bin-packer; decoupled from `coalesce.target_partition_bytes`
    so the two rules tune independently
  - `skew_join.small_partition_factor` (0.2) — tail-merge factor from
    Spark's `splitSizeListByTargetSize` legacy
- `SessionConfigExt` accessor pairs for each knob (mirrors the existing
  coalesce-config accessor pattern).
- `SkewJoinPlan` / `SkewJoinShard` carrier types in shuffle_reader.rs,
  mirroring `CoalescePlan` / `PartitionGroup`. `SkewJoinShard` encodes
  Spark's `PartialReducerPartitionSpec(reducerIdx, startMapIdx,
  endMapIdx)` semantics. Plan attaches to both legs of a join's
  alignment group; `shards.len() == K'` matches across legs so the
  planner builds matched join inputs by zipping the two legs.

`cargo check --workspace` clean.
…rExec / adapter

Adds the plumbing layer that lets a future rule attach a per-stage skew-join
decision and have the adapter materialize it into a shuffle reader with
per-mapper sub-range slicing. Still no rule and no production behavior
change — `set_skew_join` is only called by the new unit tests.

- `ExchangeExec`: new `skew_join` slot (mirror of the existing `coalesce`
  slot). `set_skew_join` / `skew_join()` accessors, threaded through
  `with_new_children`, and rendered in EXPLAIN as `skew_join=K' of M`.
- `ShuffleReaderExec`: new `skew_join: Option<SkewJoinPlan>` field plus a
  `try_new_skew_join` constructor mirroring `try_new_coalesced`. Debug
  asserts on K'-shape invariants. Threaded through `with_work_dir`,
  `with_client_pool`, `with_new_children`, and EXPLAIN.
- `BallistaAdapter::transform_children`: extended from 2-arm to 4-arm
  match on `(coalesce, skew_join)`:
  - `(Some, Some)` → exec_err — mutually exclusive by construction
  - `(Some, None)` → existing coalesce path
  - `(None, Some(sp))` → slice each upstream's `Vec<PartitionLocation>`
    to the shard's `[start_map_idx, end_map_idx)` window, preserve hash
    partitioning width at K' (the eventual `is_skew_join` flag on the
    join op in C4 is what relaxes "same key in one partition")
  - `(None, None)` → unchanged

Tests in `adapter.rs::tests`:
- `adapter_slices_skew_join_shards_by_map_range`: builds a 3×4 synthetic
  upstream, splits upstream 0 two-way, upstream 1 four-way, leaves
  upstream 2 as passthrough; asserts K'=7 with the exact
  `(upstream_partition_id, map_partition_id)` pairs per output shard.
- `adapter_errors_when_both_coalesce_and_skew_join_set`: regression
  guard for the mutual-exclusion invariant.
- `adapter_guards_out_of_bounds_skew_join_indices`: out-of-bounds
  `upstream_idx` errors clearly; `end_map_idx > inner.len()` clamps;
  `start >= end` yields an empty shard.

`cargo check --workspace` clean; full AQE suite (52 tests) passes.
…te matching side

Ports Spark's OptimizeSkewedJoin into Ballista's AQE pipeline. When one side
of a binary join has a partition whose bytes exceed BOTH
factor * median(per-side sizes) AND an absolute threshold, the rule splits
that side's per-mapper byte vector into contiguous [start_map_idx, end_map_idx)
ranges (bin-packed near advisory_partition_bytes via the coalesce module's
split_size_list_by_target_size helper) and cartesian-pairs them against the
other side's ranges. Both sides' resulting paired SkewJoinPlans land on the
two leaf ExchangeExecs via the carrier slots from C2; the adapter consumes
them to build K'-partition ShuffleReaderExecs with per-mapper sub-range
slicing.

Apply Spark's per-join-type split-side allowlist (Inner: both, Left/LeftSemi/
LeftAnti/LeftMark: left only, Right: right only, Full: neither). v1 handles
exactly one binary join per stage subtree; multi-join stages bail (Spark
iterates per-join, a follow-up can do the same). Default off via
ballista.planner.skew_join.enabled.

Wired after CoalescePartitionsRule in planner.rs::actionable_stages() — the
two carriers are mutually exclusive, so running coalesce first lets skew-join
idempotently short-circuit on already-claimed leaves.

Algorithm helpers (skew_join/algorithm.rs): is_skewed dual-guard detection,
map_ranges_for_upstream bin-packer (reuses coalesce's helper), pair_shards
cartesian product, robust_median. 11 unit tests cover detection edges,
bin-pack output, and the four cartesian shapes (split×split,
split×passthrough, passthrough×split, passthrough×passthrough).

Integration tests (state/aqe/test/skew_join_rule.rs): 6 end-to-end cases
through AdaptivePlanner — single-side skew on SMJ and HashJoin(Partitioned),
disabled, below-threshold guard, uniform-bytes guard, and a final-stage
shuffle-reader check that K'=7 partitions flow through the adapter into
the runnable plan.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…ynamic-filter mutual exclusion

After investigating whether DataFusion's SortMergeJoinExec needs an
is_skew_join flag analogous to Spark's, the answer is: no flag is needed
on the join itself (DF's SMJ has no global per-key invariant the rewrite
violates). But two correctness gaps in the adapter / rule pipeline did
surface, and this commit closes both.

B-fix (adapter, always-on): the skew_join adapter arm now returns
Partitioning::UnknownPartitioning(K') instead of Hash(keys, K'). Splitting
a hash bucket across multiple downstream partitions means the same key now
lives in multiple partitions, so the data is not truly hash-partitioned at
K' — claiming Hash would mislead downstream operators (other joins, hash
aggregates, DataFusion's per-partition dynamic-filter routing) into trust
they shouldn't have. UnknownPartitioning is the honest declaration; within
the current stage the join above still works because each task receives a
properly-paired (left-shard, right-shard) bundle.

D-fix (rule, mutual exclusion): DataFusion 53.1's HashJoin dynamic filter
pushdown builds a CASE expression keyed on hash(join_keys) % K' that routes
each probe row to one partition's bounds. The skew rewrite intentionally
violates that hash-co-location invariant, so the routed CASE would filter
out probe rows whose matches live in a different partition → silent wrong
results. The default for optimizer.enable_join_dynamic_filter_pushdown in
DF 53.1 is true, so this is a today-problem, not future-proofing.

Picked mutual exclusion (option 3 of 4) for v1: when the DF option is on,
the rule refuses to fire with a clear log line. Users opt into one or the
other, not both. Alternatives — documentation only (rejected, DF default
is on), plan-mutation to clear dynamic filters (more invasive), and
upstream DF change to make per-partition CASE skew-aware (tracked at
~/mydocs/datafusion/aqe-tasks/11-skew-compatible-hash-join-dynamic-filter.md
as a follow-up) — are all documented in the rule's source comment.

Tests: existing five fire/non-fire skew_join tests pre-set
enable_join_dynamic_filter_pushdown=false (via a comment in
skew_join_context); one new test should_bail_when_dynamic_filter_pushdown_enabled
covers the mutual-exclusion bail path. The shuffle-reader integration test
now expects UnknownPartitioning(7) instead of Hash([c@1], 7), with a
docstring explaining why the rewrite must declare unknown partitioning.

All 70 AQE tests pass (63 prior + 6 from C3 + 1 new in C4).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@martin-g
Copy link
Copy Markdown
Member

martin-g commented May 27, 2026

@wirybeaver Please resolve the conflicts!
Update: Sorry, I didn't notice that it is still a draft!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants