feat(aggregate): cost-aware partial-aggregation skip (opt-in)#22518
feat(aggregate): cost-aware partial-aggregation skip (opt-in)#22518zhuqi-lucas wants to merge 13 commits into
Conversation
The fixed `skip_partial_aggregation_probe_ratio_threshold` (default 0.8) catches "the partial agg barely reduces anything" cases, but it misses the band where the ratio is moderate (say 0.5-0.6) and partial aggregation is *still* net-negative because per-row cost is high — heavy variable- length keys, complex aggregates, etc. ClickBench Q18 is the motivating example (issue apache#22405): ratio 0.565, but partial agg burns 17s of compute across 12 partitions while reducing input only ~40%; turning the threshold down enough to catch it would regress lower-cost queries. Add a second, opt-in skip rule that augments the fixed-ratio check with the measured per-row wall time of the operator. Disabled by default, so existing behaviour is preserved. New config (all under `datafusion.execution`): - `skip_partial_aggregation_use_cost_model` (bool, default false) — turns the cost-aware rule on. - `skip_partial_aggregation_cost_ns_per_row` (u64, default 1000) — the per-row wall-time floor above which the cost-aware rule fires. - `skip_partial_aggregation_cost_min_ratio` (f64, default 0.3) — below this ratio partial agg is kept regardless of per-row cost (it's reducing too much to be worth skipping). How it works: `SkipAggregationProbe` already runs at probe-window boundaries and already has `baseline_metrics.elapsed_compute` ticking through every timed block. The probe now snapshots that counter at construction; once `probe_rows_threshold` is reached, it computes `ns_per_row = (elapsed_compute - snapshot) / input_rows` and, if both the per-row cost is above the floor and the ratio sits in the medium band, switches to skip mode. The existing high-ratio rule still fires first, so this is purely additive. Five unit tests on `SkipAggregationProbe` cover the new branches — cost-model-off matches the legacy ratio check, medium-ratio + high cost skips, below-min-ratio doesn't, cheap-per-row doesn't, and the high-ratio rule is honoured even with the cost model on. Refs: apache#22405
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/adaptive-partial-agg-cost (88f6d4c) to a87bdc9 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/adaptive-partial-agg-cost (88f6d4c) to a87bdc9 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/adaptive-partial-agg-cost (88f6d4c) to a87bdc9 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Temporarily set `skip_partial_aggregation_use_cost_model` default = true so the benchmark bot actually exercises the new code path. **Revert this commit before merge** — final default should remain false (opt-in) until ClickBench-wide validation tunes the constants. Regenerated: - docs/source/user-guide/configs.md - datafusion/sqllogictest/test_files/information_schema.slt (SHOW ALL added the new 3 config rows; CI was failing on the stale expectation).
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/adaptive-partial-agg-cost (9940d8a) to a87bdc9 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
The 1000 ns/row threshold was wishful thinking. Re-derived per-row cost on the benchmark hardware (Neoverse-V2 ARM): for ClickBench Q18, partial agg costs ~100-200 ns/row, well below 1000. M-series MBP from the issue report is similar (~170 ns/row reading back from the 17 s / 100 M figure). 100 ns/row is roughly the floor of a hash-table probe + insert on modern CPUs, so anything above that is in the "meaningful per-row overhead" band where partial agg can plausibly be net-negative. The 0.3 cost_min_ratio guard keeps low-cardinality / high-reduction queries (like ClickBench Q35) safe — they sit below 0.3 and never enter this branch regardless of per-row cost.
|
run benchmark clickbench |
|
run benchmark clickbench_partitioned |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/adaptive-partial-agg-cost (5c10375) to bdf8a6d (merge-base) diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/adaptive-partial-agg-cost (5c10375) to bdf8a6d (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Round 1 (cost_ns_per_row > 1000) didn't fire on Q18 because partial agg
per-row cost at probe close (first 100k rows, hash table still small)
is ~100-200 ns on the ARM bot, not 1000+. Lowering to 100 didn't help
either — the measured cost at probe time underestimates the eventual
asymptotic cost, so a single-shot probe-time threshold is fundamentally
fragile.
Pivot:
- Drop `skip_partial_aggregation_cost_ns_per_row` entirely from the
decision. Rule 2 is now a pure ratio check: skip when
`ratio >= cost_min_ratio` (default 0.5).
- This matches the empirical finding in the issue body: ratio_threshold
= 0.6 makes Q18 1.73× faster on M-series. 0.5 is conservative around
that — the 0.3 cost_min_ratio guard from before is gone.
- Add two diagnostic gauges (always recorded, regardless of which rule
fires):
* `partial_agg_probe_ns_per_row` — measured per-row wall time
* `partial_agg_probe_ratio_per_mille` — ratio × 1000
EXPLAIN ANALYZE shows these so we can revisit a real cost-aware rule
later with actual numbers instead of guessing thresholds.
Why keep `use_cost_model` as the flag name even though it isn't
cost-aware anymore: the gauges (the basis for a future cost-aware
rule) ride alongside, and we want a single opt-in surface that
graduates from "lower ratio threshold" to "cost-aware" without
churning configs.
Unit tests rewritten to match: 5 tests covering off/on, fires/doesn't
fire, fixed-rule precedence, and gauge recording.
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/adaptive-partial-agg-cost (e6a98fe) to bdf8a6d (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
The default-true was a temporary flip so the benchmarking bot would exercise the new code path. Revert before merge — opt-in stays the contract until the cost-aware variant lands.
Replace the fixed lower-ratio rule with measurement-driven A/B sampling.
After the initial partial-probe window closes (100k rows by default),
the operator routes the next 10k rows through the passthrough path to
measure `passthrough_ns/row`, then compares it against the previously
measured `partial_ns/row` via a closed-form cost crossover:
cost_keep_partial = partial_ns × N + final_ns × N × ratio
cost_skip = passthrough_ns × N + final_ns × N
assuming final_ns ≈ partial_ns (similar hash-table mechanics):
skip wins ⇔ ratio > passthrough_ns / partial_ns
The crossover is set entirely by the two measured numbers — no magic
constant, no hardcoded ratio. Rule 1 (ratio >= 0.8) still short-circuits
before A/B, preserving the legacy cheap path.
State machine extensions:
- New `ExecutionState::AbSampling` mirrors `SkippingAggregation`
(input → `transform_to_states` → output) but *keeps the partial
hash table* — if A/B decides to keep partial, the stream reverts
to `ReadingInput` and the hash table continues accumulating.
- `ProbePhase` enum (Partial / AbSampling / Locked) inside
`SkipAggregationProbe` drives the transitions.
Diagnostic gauges exposed via EXPLAIN ANALYZE:
- `partial_agg_probe_partial_ns_per_row` — measured at probe close
- `partial_agg_probe_passthrough_ns_per_row` — measured at A/B close
- `partial_agg_probe_ratio_per_mille` — ratio × 1000
- `partial_agg_probe_cost_decision_skip` — 1 if cost said skip, 0 if keep
Config:
- `skip_partial_aggregation_use_cost_model` (bool, default false) —
opt-in switch. With it off, behaviour is exactly the legacy bare
ratio check.
- `skip_partial_aggregation_ab_sampling_rows` (usize, default 10_000) —
size of the A/B sampling window.
- Drops `skip_partial_aggregation_cost_min_ratio` and
`skip_partial_aggregation_cost_ns_per_row` from the previous
iteration of this PR — they were magic-constant gates that the
cost-aware formula obsoletes.
7 `SkipAggregationProbe` unit tests cover:
- cost-model-off matches legacy ratio check
- cost-model-on short-circuits on Rule 1 (no A/B needed)
- A/B sampling entry transition
- cost decision chooses skip when partial expensive
- cost decision chooses keep when passthrough not much cheaper
- A/B window accumulates across multiple batches
- diagnostic gauges record at every transition
Existing 100 aggregate tests + 10 aggregate SLT files still pass.
Same temporary flip as before — benchmarking bot uses default config, so cost-aware A/B sampling would never run otherwise. Revert this commit before merge; the contract stays opt-in until we have data on which to base a default change.
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/adaptive-partial-agg-cost (df6e264) to bdf8a6d (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Phase 2 of cost-aware partial-agg skip. Instead of one final
decision per partition, the probe rewinds back to the partial-probe
phase after `re_probe_interval_rows` rows (default 1M) and re-runs
the partial+A/B sampling cycle on the next segment. Lets a single
partition oscillate between partial and skip as the data
distribution shifts (dense burst of repeated keys followed by
high-cardinality stretch, etc.).
State machine: `ProbePhase::Locked { should_skip }` becomes
`ProbePhase::Active { should_skip, rows_since_decision }`. Per-batch:
- In keep-partial: `observe_partial_batch` increments
`rows_since_decision`. At the threshold, `start_reprobe` resets
the probe (phase = Partial, counters cleared,
`elapsed_compute_at_probe_start` re-snapshotted, `is_locked` = false).
- In skip: `tick_skip_batch` does the same from the
`SkippingAggregation` exec-state arm. When re-probe fires, the
main loop transitions back to `ReadingInput` so the partial-agg
path runs on the next batch (fresh hash table, since the previous
one was emitted on entry to skip).
Final-agg correctness is unaffected: each segment's output (be it
emitted partial state or per-row passthrough state) is associative-
commutative and merges naturally downstream.
New config:
- `skip_partial_aggregation_re_probe_interval_rows` (usize,
default 1_000_000). Set to 0 to disable re-probing entirely
(one-shot decision, the Phase 1 behaviour).
New diagnostic counter:
- `partial_agg_probe_segment_count` — number of completed segments
in the current partition. 0 means the probe ran once and never
re-probed; a large value on a fast query suggests the interval is
too small.
Three new `SkipAggregationProbe` unit tests cover:
- re-probe after a committed skip decision rewinds to `Partial`
- re-probe after a committed keep decision rewinds to `Partial`
- `re_probe_interval_rows = 0` disables re-probing
Existing test `test_skip_aggregation_probe_not_locked_until_skip`
explicitly disables the cost-aware path (it exercises a legacy
Rule 1 corner case the cost model would intercept differently).
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/adaptive-partial-agg-cost (c716f33) to 2453bec (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
This reverts commit 44f815a.
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/adaptive-partial-agg-cost (a258afe) to 2453bec (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
The Phase 2 revert took out the previous config-disable line. Test exercises the original Rule 1 short-circuit behaviour, so it needs to opt out of the default-on A/B sampling explicitly.
Which issue does this PR close?
Phase 1 prototype for #22405.
Rationale for this change
The current
skip_partial_aggregation_probe_ratio_threshold(default 0.8) is a single fixed knob: when measurednum_groups / input_rows≥ 0.8, partial aggregation skips. This catches the no-reduction case but misses the medium-ratio band (~0.5–0.7) where partial aggregation is still net-negative because per-row cost is high — heavy variable-length keys, complex aggregates, etc.ClickBench Q18 is the motivating example. Measured ratio is 0.565, well below 0.8, so partial aggregation keeps running and burns ~17 s of compute across 12 partitions for only ~40 % reduction. Lowering the global threshold to 0.6 fixes Q18 but is likely to regress lower-cost queries that benefit from partial agg at that ratio.
This PR replaces the static threshold guess with measured A/B sampling: probe the per-row cost of both partial agg and passthrough, then pick the cheaper path via a closed-form cost comparison. No magic constants.
How it works
After the existing partial probe window (
probe_rows_threshold, default 100k rows) closes:Partial probe — measure
partial_ns_per_rowfrom the partial-agg path so far, and computeratio = num_groups / input_rows.Rule 1 short-circuit —
ratio ≥ probe_ratio_threshold (0.8)skips immediately (existing behaviour, preserved for compatibility and to save the A/B window when the answer is obvious).A/B sampling — when Rule 1 doesn't fire, route the next
ab_sampling_rows(default 10k) through the passthrough (transform_to_states) path. The hash table is preserved; the passthrough output is sent downstream and merges naturally in Final agg.Cost decision — at the end of the A/B window, measure
passthrough_ns_per_rowand apply:Derived from
cost_keep_partial = partial × N + final × N × ratiovscost_skip = passthrough × N + final × N, assumingfinal ≈ partial(same hash-table mechanics).If the decision is skip, emit the partial hash table and continue via
SkippingAggregation. If keep, return toReadingInputand the hash table continues accumulating.The crossover is set entirely by the two measured numbers — no magic threshold, automatically adapts to hardware and query shape.
Benchmark (ClickBench partitioned, ARM Neoverse-V2 12 vCPU)
What changes are included in this PR?
SkipAggregationProbeextended with a phased state machine:(
ExecutionState::AbSamplingmirrorsSkippingAggregation— input goes throughtransform_to_states— but keeps the partial hash table.)New
datafusion.execution.*config:skip_partial_aggregation_use_cost_model(bool, default true) — turns the A/B path on. Set false to fall back to the bare ratio check.skip_partial_aggregation_ab_sampling_rows(usize, default 10000) — size of the passthrough sample window.New EXPLAIN ANALYZE diagnostic gauges so users (and follow-up tuning work) can see what the probe is doing per-partition:
partial_agg_probe_partial_ns_per_rowpartial_agg_probe_passthrough_ns_per_rowpartial_agg_probe_ratio_per_mille(ratio × 1000, integer storage)partial_agg_probe_cost_decision_skip(1 = cost said skip, 0 = cost said keep)Are these changes tested?
Seven
SkipAggregationProbeunit tests:skip_probe_cost_model_off_matches_legacy_ratio_check— bare ratio check unchanged when cost model is off.skip_probe_cost_model_short_circuits_on_high_ratio— Rule 1 still wins over A/B.skip_probe_enters_ab_sampling_when_partial_window_closes— A/B transition.skip_probe_cost_decision_chooses_skip_when_partial_is_expensive— cost crossover (skip).skip_probe_cost_decision_chooses_keep_when_passthrough_not_much_cheaper— cost crossover (keep).skip_probe_ab_window_accumulates_across_batches— sampling spans multiple input batches.skip_probe_records_diagnostic_gauges— diagnostic metrics fire as expected.Existing 100 aggregate tests + 10 aggregate SLT files still pass;
cargo clippy -p datafusion-physical-plan --all-targets -- -D warningsclean.Are there any user-facing changes?
Two additive
datafusion.execution.*config options. Default behaviour for the cost-aware path is on based on the benchmark above; can be opted out viaSET datafusion.execution.skip_partial_aggregation_use_cost_model = false.Followups
Segment-level re-probing (was attempted in this PR but reverted — see commits
44f815a87,c506a81fb). The current implementation makes one A/B decision per partition. Re-probing every N rows would let a single partition switch direction as the data distribution shifts. Implementation hit a pre-existingGroupValuesissue:emit(EmitTo::All)clears the per-column arrays but the hash→index map appears to retain stale entries, panicking on subsequent partial-agg inserts atmulti_group_by/primitive.rs:156. Should be tackled as a follow-up after that reset semantic is sorted out.The simplifying assumption
final_ns ≈ partial_nsin the cost formula is reasonable but not exact. A more refined model could track Final-agg per-row cost separately. Possible follow-up if measured data (via the diagnostic gauges above) shows the assumption costs us in some workload.