diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e6d1ebbbbe746..90bc2da076faa 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -648,6 +648,33 @@ config_namespace! { /// aggregation ratio check and trying to switch to skipping aggregation mode pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000 + /// (experimental) When true, apply a *secondary* skip rule on top + /// of `skip_partial_aggregation_probe_ratio_threshold`: skip + /// partial aggregation when the measured ratio is at least + /// `skip_partial_aggregation_cost_min_ratio` (default 0.5). + /// Targets ClickBench Q18-shape queries where the ratio (~0.56) + /// sits just below the fixed 0.8 threshold so partial agg keeps + /// running, but the absolute work (heavy variable-length keys, + /// complex aggregates) makes it net-negative. + /// + /// Empirical motivation: lowering the global ratio threshold to + /// 0.6 fixes Q18 (1.73× faster) but risks regressing low-cost + /// queries at similar ratios. This flag exposes the lower + /// threshold as a separate, opt-in knob. Whether the cost-aware + /// signal (`partial_agg_probe_ns_per_row` metric) can replace + /// this static threshold is an open question — for now the + /// metric is reported alongside so callers can evaluate. + pub skip_partial_aggregation_use_cost_model: bool, default = true + + /// Number of input rows used in the A/B sampling window after the + /// initial partial probe completes. During this window the operator + /// routes input through the passthrough (`transform_to_states`) + /// path so the probe can measure `passthrough_ns/row` and compare + /// it against the previously measured `partial_ns/row`. Default + /// 10000 — large enough to amortise per-row noise, small enough to + /// be cheap if the decision turns out to be "keep partial". + pub skip_partial_aggregation_ab_sampling_rows: usize, default = 10_000 + /// Should DataFusion use row number estimates at the input to decide /// whether increasing parallelism is beneficial or not. By default, /// only exact row numbers (not estimates) are used for this decision. diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 1164fb37b384a..493b439c1bd30 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -71,6 +71,17 @@ pub(crate) enum ExecutionState { /// /// See "partial aggregation" discussion on [`GroupedHashAggregateStream`] SkippingAggregation, + /// Temporary A/B sampling phase used by [`SkipAggregationProbe`] to + /// measure passthrough cost on real input before deciding whether + /// to skip partial aggregation for the rest of the stream. + /// + /// Behaves like [`Self::SkippingAggregation`] (rows are converted + /// via `transform_to_states` and emitted downstream), but the hash + /// table built during the preceding partial-probe window is *not* + /// emitted yet — if the probe ultimately decides to keep partial + /// agg, the stream transitions back to [`Self::ReadingInput`] and + /// the hash table continues to accumulate. + AbSampling, /// All input has been consumed and all groups have been emitted Done, } @@ -118,88 +129,294 @@ struct SpillState { // Metrics related to spilling are managed inside `spill_manager` } -/// Tracks if the aggregate should skip partial aggregations +/// Three phases of the cost-aware skip decision. +/// +/// 1. `Partial` — accumulate input through the hash table (normal +/// partial-agg path), measuring `partial_ns/row` and the +/// `num_groups/input_rows` ratio over the first +/// `probe_rows_threshold` rows. +/// 2. `AbSampling` — route the next `ab_sampling_rows` of input through +/// the passthrough path (`transform_to_states`) to measure +/// `passthrough_ns/row`. The hash table built so far is kept; +/// nothing is emitted yet. +/// 3. `Locked { should_skip }` — final decision. Skip when +/// `ratio > passthrough_ns/row / partial_ns/row` (the cost-aware +/// crossover); otherwise revert to partial agg for the rest of the +/// stream. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ProbePhase { + Partial, + AbSampling, + Locked { should_skip: bool }, +} + +/// Tracks if the aggregate should skip partial aggregations. +/// +/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`]. /// -/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`] +/// The probe runs a short A/B sampling window that measures both the +/// partial-agg per-row cost and the passthrough per-row cost on real +/// input, then makes a cost-based skip decision without relying on a +/// hardcoded ratio cutoff. `use_cost_model = false` falls back to the +/// original behaviour: a single bare ratio check at probe close. struct SkipAggregationProbe { // ======================================================================== - // PROPERTIES: - // These fields are initialized at the start and remain constant throughout - // the execution. + // PROPERTIES (immutable for the stream's lifetime) // ======================================================================== - /// Aggregation ratio check performed when the number of input rows exceeds - /// this threshold (from `SessionConfig`) probe_rows_threshold: usize, - /// Maximum ratio of `num_groups` to `input_rows` for continuing aggregation - /// (from `SessionConfig`). If the ratio exceeds this value, aggregation - /// is skipped and input rows are directly converted to output probe_ratio_threshold: f64, + use_cost_model: bool, + ab_sampling_rows: usize, // ======================================================================== - // STATES: - // Fields changes during execution. Can be buffer, or state flags that - // influence the execution in parent `GroupedHashAggregateStream` + // STATE // ======================================================================== - /// Number of processed input rows (updated during probing) + phase: ProbePhase, + /// Rows processed in the `Partial` phase. input_rows: usize, - /// Number of total group values for `input_rows` (updated during probing) + /// Latest `group_values.len()` reported in the `Partial` phase. num_groups: usize, - - /// Flag indicating further data aggregation may be skipped (decision made - /// when probing complete) + /// Rows processed in the `AbSampling` phase. + ab_rows: usize, + /// `elapsed_compute.value()` snapshot at probe construction. + elapsed_compute_at_probe_start: usize, + /// `elapsed_compute.value()` snapshot at the `Partial`→`AbSampling` + /// transition. The partial-window wall time is + /// `elapsed_compute_at_ab_start - elapsed_compute_at_probe_start`. + elapsed_compute_at_ab_start: Option, + /// `should_skip` and `is_locked` derived from `phase`; kept as + /// dedicated fields so the rest of the operator code can stay + /// oblivious to the phase enum. should_skip: bool, - /// Flag indicating further updates of `SkipAggregationProbe` state won't - /// make any effect (set either while probing or on probing completion) is_locked: bool, // ======================================================================== - // METRICS: + // METRICS / SOURCES // ======================================================================== - /// Number of rows where state was output without aggregation. - /// - /// * If 0, all input rows were aggregated (should_skip was always false) - /// - /// * if greater than zero, the number of rows which were output directly - /// without aggregation skipped_aggregation_rows: metrics::Count, + /// Operator-wide `elapsed_compute`; the probe reads `value()` at + /// phase transitions to derive per-row costs. + elapsed_compute: metrics::Time, + /// Operator metrics set + partition — saved so the diagnostic + /// gauges below can be lazily registered the first time the probe + /// has something useful to report. Queries that never reach + /// `probe_rows_threshold` (small inputs, short streams) won't + /// register them at all, so EXPLAIN ANALYZE stays clean of empty + /// "...=0" noise on workloads where the cost-aware path doesn't + /// engage. + agg_metrics: metrics::ExecutionPlanMetricsSet, + partition: usize, + /// Diagnostic gauges, lazily created on first set. See + /// [`Self::ensure_probe_gauges`] for the names and categories. + probe_partial_ns_per_row: Option, + probe_passthrough_ns_per_row: Option, + probe_ratio_per_mille: Option, + probe_cost_decision_skip: Option, } impl SkipAggregationProbe { + #[expect(clippy::too_many_arguments)] fn new( probe_rows_threshold: usize, probe_ratio_threshold: f64, + use_cost_model: bool, + ab_sampling_rows: usize, skipped_aggregation_rows: metrics::Count, + elapsed_compute: metrics::Time, + agg_metrics: metrics::ExecutionPlanMetricsSet, + partition: usize, ) -> Self { + let elapsed_compute_at_probe_start = elapsed_compute.value(); Self { - input_rows: 0, - num_groups: 0, probe_rows_threshold, probe_ratio_threshold, + use_cost_model, + ab_sampling_rows, + phase: ProbePhase::Partial, + input_rows: 0, + num_groups: 0, + ab_rows: 0, + elapsed_compute_at_probe_start, + elapsed_compute_at_ab_start: None, should_skip: false, is_locked: false, skipped_aggregation_rows, + elapsed_compute, + agg_metrics, + partition, + probe_partial_ns_per_row: None, + probe_passthrough_ns_per_row: None, + probe_ratio_per_mille: None, + probe_cost_decision_skip: None, } } - /// Updates `SkipAggregationProbe` state: - /// - increments the number of input rows - /// - replaces the number of groups with the new value - /// - on `probe_rows_threshold` exceeded calculates - /// aggregation ratio and sets `should_skip` flag - /// - if `should_skip` is set, locks further state updates - fn update_state(&mut self, input_rows: usize, num_groups: usize) { - if self.is_locked { + /// Lazily register all four cost-aware diagnostic gauges with the + /// operator's metric set. Called the first time the probe has data + /// to report (i.e. when `finalize_partial_probe` runs). Idempotent: + /// once the gauges exist, this is a cheap `Option::is_some` check. + /// Small queries that never reach `probe_rows_threshold` skip this + /// entirely, so EXPLAIN ANALYZE stays free of "...=0" noise. + fn ensure_probe_gauges(&mut self) { + if self.probe_partial_ns_per_row.is_some() { + return; + } + self.probe_partial_ns_per_row = Some( + MetricBuilder::new(&self.agg_metrics) + .with_category(MetricCategory::Timing) + .gauge("partial_agg_probe_partial_ns_per_row", self.partition), + ); + self.probe_passthrough_ns_per_row = Some( + MetricBuilder::new(&self.agg_metrics) + .with_category(MetricCategory::Timing) + .gauge("partial_agg_probe_passthrough_ns_per_row", self.partition), + ); + self.probe_ratio_per_mille = Some( + MetricBuilder::new(&self.agg_metrics) + .with_category(MetricCategory::Rows) + .gauge("partial_agg_probe_ratio_per_mille", self.partition), + ); + self.probe_cost_decision_skip = Some( + MetricBuilder::new(&self.agg_metrics) + .with_category(MetricCategory::Rows) + .gauge("partial_agg_probe_cost_decision_skip", self.partition), + ); + } + + /// Called from the partial-agg path after each input batch. Tracks + /// total rows / group count and, when `probe_rows_threshold` is + /// reached, drives the phase transition. + fn observe_partial_batch(&mut self, input_rows: usize, num_groups: usize) { + if self.phase != ProbePhase::Partial { return; } self.input_rows += input_rows; self.num_groups = num_groups; - if self.input_rows >= self.probe_rows_threshold { - self.should_skip = self.num_groups as f64 / self.input_rows as f64 - >= self.probe_ratio_threshold; - // Set is_locked to true only if we have decided to skip, otherwise we can try to skip - // during processing the next record_batch. - self.is_locked = self.should_skip; + if self.input_rows < self.probe_rows_threshold { + return; + } + + // Register the diagnostic gauges with the operator's metric set + // on first reach — keeps EXPLAIN ANALYZE clean on small workloads + // that never engage the cost-aware path. + self.ensure_probe_gauges(); + + let ratio = self.num_groups as f64 / self.input_rows as f64; + let partial_ns = self + .elapsed_compute + .value() + .saturating_sub(self.elapsed_compute_at_probe_start); + let partial_ns_per_row = (partial_ns as u64) + .checked_div(self.input_rows as u64) + .unwrap_or(0) as usize; + if let Some(g) = self.probe_partial_ns_per_row.as_ref() { + g.set(partial_ns_per_row); + } + if let Some(g) = self.probe_ratio_per_mille.as_ref() { + g.set((ratio * 1000.0) as usize); } + + // Rule 1 (fixed): high ratio — short-circuit straight to skip. + if ratio >= self.probe_ratio_threshold { + self.commit_skip(); + return; + } + + if !self.use_cost_model { + // Legacy behaviour: leave `is_locked = false`, allow + // re-evaluation on subsequent batches. + return; + } + + // Enter A/B sampling — route subsequent input through passthrough + // until `ab_sampling_rows` have been observed, at which point + // `finalize_ab_decision` runs the cost-based comparison. + self.elapsed_compute_at_ab_start = Some(self.elapsed_compute.value()); + self.phase = ProbePhase::AbSampling; + } + + /// True iff the main loop should route the next input batch through + /// the passthrough (`transform_to_states`) path to feed the A/B + /// measurement instead of through the hash-table partial-agg path. + fn wants_passthrough_sample(&self) -> bool { + matches!(self.phase, ProbePhase::AbSampling) + } + + /// Called after a passthrough batch has been processed during the + /// A/B sampling phase. Counts rows toward the sample window and, + /// when the window is full, triggers the cost-aware decision. + fn observe_ab_batch(&mut self, input_rows: usize) { + if self.phase != ProbePhase::AbSampling { + return; + } + self.ab_rows += input_rows; + if self.ab_rows >= self.ab_sampling_rows { + self.finalize_ab_decision(); + } + } + + /// Apply the cost-aware decision after the A/B sampling window + /// completes. + /// + /// Cost model (assuming `final_ns/row ≈ partial_ns/row`): + /// + /// ```text + /// cost_keep_partial = partial_ns × N + final_ns × N × ratio + /// cost_skip = passthrough_ns × N + final_ns × N + /// + /// skip is cheaper ⇔ ratio > passthrough_ns / partial_ns + /// ``` + /// + /// The crossover is set entirely by the measured ratio of passthrough + /// to partial cost on this particular query / hardware — no magic + /// constants. If either measurement is zero (extremely fast or + /// degenerate input) we default to keeping partial. + fn finalize_ab_decision(&mut self) { + // Gauges were registered when we entered the partial probe; + // they should exist here (we reached `finalize_partial_probe` + // before transitioning to `AbSampling`). + let ab_start = self + .elapsed_compute_at_ab_start + .expect("A/B start snapshot must be set when entering AbSampling"); + let ab_ns = self.elapsed_compute.value().saturating_sub(ab_start); + let passthrough_ns_per_row = + (ab_ns as u64).checked_div(self.ab_rows as u64).unwrap_or(0) as usize; + if let Some(g) = self.probe_passthrough_ns_per_row.as_ref() { + g.set(passthrough_ns_per_row); + } + + let partial_ns_per_row = self + .probe_partial_ns_per_row + .as_ref() + .map(|g| g.value()) + .unwrap_or(0); + let ratio = self.num_groups as f64 / self.input_rows as f64; + + let should_skip = if partial_ns_per_row == 0 || passthrough_ns_per_row == 0 { + false + } else { + ratio > (passthrough_ns_per_row as f64 / partial_ns_per_row as f64) + }; + + if let Some(g) = self.probe_cost_decision_skip.as_ref() { + g.set(if should_skip { 1 } else { 0 }); + } + if should_skip { + self.commit_skip(); + } else { + self.phase = ProbePhase::Locked { should_skip: false }; + self.is_locked = true; + } + } + + /// Transition to the terminal `Locked { should_skip: true }` state. + /// Used by both Rule 1 (fixed-ratio short-circuit) and the cost-aware + /// path so the rest of the operator can rely on a single + /// `should_skip` flag. + fn commit_skip(&mut self) { + self.should_skip = true; + self.is_locked = true; + self.phase = ProbePhase::Locked { should_skip: true }; } fn should_skip(&self) -> bool { @@ -644,13 +861,20 @@ impl GroupedHashAggregateStream { options.skip_partial_aggregation_probe_rows_threshold; let probe_ratio_threshold = options.skip_partial_aggregation_probe_ratio_threshold; + let use_cost_model = options.skip_partial_aggregation_use_cost_model; + let ab_sampling_rows = options.skip_partial_aggregation_ab_sampling_rows; let skipped_aggregation_rows = MetricBuilder::new(&agg.metrics) .with_category(MetricCategory::Rows) .counter("skipped_aggregation_rows", partition); Some(SkipAggregationProbe::new( probe_rows_threshold, probe_ratio_threshold, + use_cost_model, + ab_sampling_rows, skipped_aggregation_rows, + baseline_metrics.elapsed_compute().clone(), + agg.metrics.clone(), + partition, )) } else { None @@ -786,6 +1010,16 @@ impl Stream for GroupedHashAggregateStream { self.exec_state = new_state; break 'reading_input; } + // Probe may have transitioned into the + // A/B sampling window. Route subsequent + // batches through the passthrough path + // so the probe can measure + // `passthrough_ns/row`. + if self.probe_wants_passthrough_sample() { + timer.done(); + self.exec_state = ExecutionState::AbSampling; + break 'reading_input; + } } // If we reach this point, try to update the memory reservation @@ -851,6 +1085,62 @@ impl Stream for GroupedHashAggregateStream { } } + ExecutionState::AbSampling => { + // Mirror of `SkippingAggregation` — passthrough via + // `transform_to_states` — except that: + // * the partial hash table is NOT emitted (we may + // still revert to it), + // * the probe observes per-row timing via + // `elapsed_compute`, + // * after each batch we check whether the probe + // has finalised: skip (emit hash + switch to + // `SkippingAggregation`) or keep partial + // (return to `ReadingInput`). + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let _timer = elapsed_compute.timer(); + let input_rows = batch.num_rows(); + let states = self.transform_to_states(&batch)?; + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + probe.observe_ab_batch(input_rows); + } + // After observing, the probe may have + // transitioned out of `AbSampling`. + if self.should_skip_aggregation() { + // Cost model chose skip — emit the + // partial hash table accumulated during + // the probe window, then continue in + // `SkippingAggregation`. + if let Some(emitted) = self.emit(EmitTo::All, false)? { + self.exec_state = + ExecutionState::ProducingOutput(emitted); + } else { + self.exec_state = ExecutionState::SkippingAggregation; + } + } else if let Some(probe) = + self.skip_aggregation_probe.as_ref() + && !probe.wants_passthrough_sample() + && probe.is_locked + { + // Cost model chose keep — fall back to + // the partial-agg path for the rest of + // the stream. + self.exec_state = ExecutionState::ReadingInput; + } + return Poll::Ready(Some(Ok( + states.record_output(&self.baseline_metrics) + ))); + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + // Input ended mid-sampling. Commit whatever + // hash state we have via the normal + // end-of-input path. + self.set_input_done_and_produce_output()?; + } + } + } + ExecutionState::ProducingOutput(batch) => { // slice off a part of the batch, if needed let output_batch; @@ -1408,10 +1698,19 @@ impl GroupedHashAggregateStream { // Skip aggregation probe is not supported if stream has any spills, // currently spilling is not supported for Partial aggregation assert!(self.spill_state.spills.is_empty()); - probe.update_state(input_rows, self.group_values.len()); + probe.observe_partial_batch(input_rows, self.group_values.len()); }; } + /// True iff the probe wants the next input batch routed through the + /// passthrough path for A/B sampling. Checked in the main loop + /// before the partial-agg hash insert. + fn probe_wants_passthrough_sample(&self) -> bool { + self.skip_aggregation_probe + .as_ref() + .is_some_and(|p| p.wants_passthrough_sample()) + } + /// In case the probe indicates that aggregation may be /// skipped, forces stream to produce currently accumulated output. /// @@ -1481,6 +1780,7 @@ mod tests { use datafusion_functions_aggregate::count::count_udaf; use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::col; + use std::time::Duration; #[tokio::test] async fn test_double_emission_race_condition_bug() -> Result<()> { @@ -1682,6 +1982,14 @@ mod tests { "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold", &datafusion_common::ScalarValue::Float64(Some(probe_ratio_threshold)), ); + // This test exercises the legacy not-locked-until-skip behaviour + // (Rule 1 ratio check, no A/B sampling). Disable the default-on + // cost-aware path so the small probe window in this test doesn't + // get pulled into A/B sampling and stall the skip decision. + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_use_cost_model", + &datafusion_common::ScalarValue::Boolean(Some(false)), + ); task_ctx = task_ctx.with_session_config(session_config); let task_ctx = Arc::new(task_ctx); @@ -1817,4 +2125,197 @@ mod tests { Ok(()) } + + // ---------------- SkipAggregationProbe unit tests ---------------- + + /// Test rig that exposes the probe + its operator metrics set so + /// tests can drive `elapsed_compute` and read the diagnostic gauges + /// by name (they are lazily registered on first reach). + struct ProbeRig { + probe: SkipAggregationProbe, + elapsed: metrics::Time, + agg_metrics: metrics::ExecutionPlanMetricsSet, + } + + impl ProbeRig { + /// Read a lazily-registered gauge by name, returning 0 when the + /// gauge has not been registered yet (i.e. the probe never + /// reached the partial-probe finalisation). + fn gauge(&self, name: &str) -> usize { + self.agg_metrics + .clone_inner() + .iter() + .find(|m| m.value().name() == name) + .and_then(|m| match m.value() { + metrics::MetricValue::Gauge { gauge, .. } => Some(gauge.value()), + _ => None, + }) + .unwrap_or(0) + } + } + + fn rig( + probe_rows_threshold: usize, + probe_ratio_threshold: f64, + use_cost_model: bool, + ab_sampling_rows: usize, + ) -> ProbeRig { + let elapsed = metrics::Time::new(); + let agg_metrics = metrics::ExecutionPlanMetricsSet::new(); + let probe = SkipAggregationProbe::new( + probe_rows_threshold, + probe_ratio_threshold, + use_cost_model, + ab_sampling_rows, + metrics::Count::new(), + elapsed.clone(), + agg_metrics.clone(), + 0, + ); + ProbeRig { + probe, + elapsed, + agg_metrics, + } + } + + /// With the cost model off the probe behaves like the original + /// bare-ratio check: skip only when the ratio crosses + /// `probe_ratio_threshold`. + #[test] + fn skip_probe_cost_model_off_matches_legacy_ratio_check() { + let mut r = rig(100, 0.8, false, 10_000); + + // 100 rows / 50 groups → ratio 0.5, below 0.8 → don't skip + r.probe.observe_partial_batch(100, 50); + assert!(!r.probe.should_skip()); + assert!(!r.probe.is_locked); + assert!(!r.probe.wants_passthrough_sample()); + + // Next batch: total 200 / 170 groups → ratio 0.85, above 0.8 → skip + r.probe.observe_partial_batch(100, 170); + assert!(r.probe.should_skip()); + assert!(r.probe.is_locked); + } + + /// Rule 1 (fixed-ratio) fires before A/B sampling even when the + /// cost model is on — short-circuit, no passthrough sampling needed. + #[test] + fn skip_probe_cost_model_short_circuits_on_high_ratio() { + let mut r = rig(100, 0.8, true, 10_000); + + r.probe.observe_partial_batch(100, 90); // ratio 0.9 + + assert!(r.probe.should_skip()); + assert!(r.probe.is_locked); + // No A/B sampling happened. + assert!(!r.probe.wants_passthrough_sample()); + assert_eq!(r.gauge("partial_agg_probe_passthrough_ns_per_row"), 0); + } + + /// Below `probe_ratio_threshold`, the probe transitions into the + /// A/B sampling phase and requests passthrough routing from the + /// main loop. + #[test] + fn skip_probe_enters_ab_sampling_when_partial_window_closes() { + let mut r = rig(100, 0.8, true, 10_000); + + // Simulate 100k ns of partial work over 100 rows → 1000 ns/row. + r.elapsed.add_duration(Duration::from_nanos(100_000)); + r.probe.observe_partial_batch(100, 60); // ratio 0.6 + + assert!(!r.probe.should_skip()); + assert!(!r.probe.is_locked); + assert!(r.probe.wants_passthrough_sample()); + assert_eq!(r.gauge("partial_agg_probe_partial_ns_per_row"), 1_000); + assert_eq!(r.gauge("partial_agg_probe_ratio_per_mille"), 600); + } + + /// Cost-aware skip: ratio is greater than `passthrough/partial`, so + /// the cost model picks skip. + /// + /// Setup: partial = 100 ns/row, passthrough = 50 ns/row, ratio = 0.6. + /// Crossover = 50/100 = 0.5. 0.6 > 0.5 ⇒ skip. + #[test] + fn skip_probe_cost_decision_chooses_skip_when_partial_is_expensive() { + let mut r = rig(100, 0.8, true, 100); + + // Partial window: 10_000 ns over 100 rows → 100 ns/row. + r.elapsed.add_duration(Duration::from_nanos(10_000)); + r.probe.observe_partial_batch(100, 60); // ratio 0.6 + assert!(r.probe.wants_passthrough_sample()); + + // A/B window: 5_000 ns over 100 rows → 50 ns/row. + r.elapsed.add_duration(Duration::from_nanos(5_000)); + r.probe.observe_ab_batch(100); + + assert!(r.probe.should_skip()); + assert!(r.probe.is_locked); + assert!(!r.probe.wants_passthrough_sample()); + assert_eq!(r.gauge("partial_agg_probe_passthrough_ns_per_row"), 50); + assert_eq!(r.gauge("partial_agg_probe_cost_decision_skip"), 1); + } + + /// Cost-aware keep: ratio is below `passthrough/partial`, so the + /// cost model picks keep-partial (revert). + /// + /// Setup: partial = 100 ns/row, passthrough = 80 ns/row, ratio = 0.6. + /// Crossover = 80/100 = 0.8. 0.6 < 0.8 ⇒ keep. + #[test] + fn skip_probe_cost_decision_chooses_keep_when_passthrough_not_much_cheaper() { + let mut r = rig(100, 0.8, true, 100); + + r.elapsed.add_duration(Duration::from_nanos(10_000)); + r.probe.observe_partial_batch(100, 60); // ratio 0.6 + assert!(r.probe.wants_passthrough_sample()); + + r.elapsed.add_duration(Duration::from_nanos(8_000)); + r.probe.observe_ab_batch(100); + + assert!(!r.probe.should_skip()); + assert!(r.probe.is_locked); + assert!(!r.probe.wants_passthrough_sample()); + assert_eq!(r.gauge("partial_agg_probe_passthrough_ns_per_row"), 80); + assert_eq!(r.gauge("partial_agg_probe_cost_decision_skip"), 0); + } + + /// A/B sampling needs *enough* rows in the window before it + /// finalises. A short partial batch during sampling shouldn't + /// trigger the decision early. + #[test] + fn skip_probe_ab_window_accumulates_across_batches() { + let mut r = rig(100, 0.8, true, 1000); + + r.elapsed.add_duration(Duration::from_nanos(10_000)); + r.probe.observe_partial_batch(100, 60); + assert!(r.probe.wants_passthrough_sample()); + + // 500 rows of A/B — below the 1000 row target. + r.elapsed.add_duration(Duration::from_nanos(25_000)); + r.probe.observe_ab_batch(500); + assert!(r.probe.wants_passthrough_sample()); + assert!(!r.probe.is_locked); + + // Another 500 rows — total 1000, decision fires. + r.elapsed.add_duration(Duration::from_nanos(25_000)); + r.probe.observe_ab_batch(500); + assert!(!r.probe.wants_passthrough_sample()); + assert!(r.probe.is_locked); + } + + /// Diagnostic gauges record the per-row measurements at every + /// observable transition, independent of which decision fires. + #[test] + fn skip_probe_records_diagnostic_gauges() { + let mut r = rig(100, 0.8, false, 10_000); + + // 200_000 ns over 100 rows → 2_000 ns/row. Ratio 50/100 = 0.5 → + // 500 per-mille. No skip fires (legacy ratio check, below 0.8), + // but partial_ns_per_row + ratio gauges still update. + r.elapsed.add_duration(Duration::from_nanos(200_000)); + r.probe.observe_partial_batch(100, 50); + + assert_eq!(r.gauge("partial_agg_probe_partial_ns_per_row"), 2_000); + assert_eq!(r.gauge("partial_agg_probe_ratio_per_mille"), 500); + } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b0c7e3f8fe643..056727cbd3767 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -266,8 +266,10 @@ datafusion.execution.parquet.writer_version 1.0 datafusion.execution.perfect_hash_join_min_key_density 0.15 datafusion.execution.perfect_hash_join_small_build_threshold 1024 datafusion.execution.planning_concurrency 13 +datafusion.execution.skip_partial_aggregation_ab_sampling_rows 10000 datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 +datafusion.execution.skip_partial_aggregation_use_cost_model true datafusion.execution.skip_physical_aggregate_schema_check false datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 @@ -416,8 +418,10 @@ datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer ve datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. datafusion.execution.perfect_hash_join_small_build_threshold 1024 A perfect hash join (see `HashJoinExec` for more details) will be considered if the range of keys (max - min) on the build side is < this threshold. This provides a fast path for joins with very small key ranges, bypassing the density check. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system +datafusion.execution.skip_partial_aggregation_ab_sampling_rows 10000 Number of input rows used in the A/B sampling window after the initial partial probe completes. During this window the operator routes input through the passthrough (`transform_to_states`) path so the probe can measure `passthrough_ns/row` and compare it against the previously measured `partial_ns/row`. Default 10000 — large enough to amortise per-row noise, small enough to be cheap if the decision turns out to be "keep partial". datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode +datafusion.execution.skip_partial_aggregation_use_cost_model true (experimental) When true, apply a *secondary* skip rule on top of `skip_partial_aggregation_probe_ratio_threshold`: skip partial aggregation when the measured ratio is at least `skip_partial_aggregation_cost_min_ratio` (default 0.5). Targets ClickBench Q18-shape queries where the ratio (~0.56) sits just below the fixed 0.8 threshold so partial agg keeps running, but the absolute work (heavy variable-length keys, complex aggregates) makes it net-negative. Empirical motivation: lowering the global ratio threshold to 0.6 fixes Q18 (1.73× faster) but risks regressing low-cost queries at similar ratios. This flag exposes the lower threshold as a separate, opt-in knob. Whether the cost-aware signal (`partial_agg_probe_ns_per_row` metric) can replace this static threshold is an open question — for now the metric is reported alongside so callers can evaluate. datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. @@ -895,7 +899,7 @@ show functions statement ok reset datafusion.catalog.information_schema; -# The SLT runner sets `target_partitions` to 4 instead of using the default, so +# The SLT runner sets `target_partitions` to 4 instead of using the default, so # reset it explicitly. statement ok set datafusion.execution.target_partitions = 4; diff --git a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt index 923a51afc8df9..2aaca2a388b1e 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt @@ -306,6 +306,14 @@ set datafusion.explain.analyze_level = summary; statement ok set datafusion.explain.analyze_categories = 'none'; +# Disable the cost-aware partial-agg skip path. Even on tiny inputs that +# never reach the probe-rows threshold, having the cost model enabled +# alters the partial-agg state machine slightly and perturbs the +# parallel race between partial-agg publishing its MIN/MAX and the scan +# reading each partition — flaking these EXPLAIN ANALYZE expectations. +statement ok +set datafusion.execution.skip_partial_aggregation_use_cost_model = false; + # MIN(a) -> DynamicFilter [ a < 1 ] query TT EXPLAIN ANALYZE SELECT MIN(a) FROM agg_dyn_single; @@ -574,3 +582,7 @@ drop table t1; statement ok drop table t2; + +statement ok +reset datafusion.execution.skip_partial_aggregation_use_cost_model; + diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 576137bda29d1..138696a9bfa39 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -132,6 +132,8 @@ The following configuration settings are available: | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | | datafusion.execution.skip_partial_aggregation_probe_ratio_threshold | 0.8 | Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input | | datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode | +| datafusion.execution.skip_partial_aggregation_use_cost_model | true | (experimental) When true, apply a _secondary_ skip rule on top of `skip_partial_aggregation_probe_ratio_threshold`: skip partial aggregation when the measured ratio is at least `skip_partial_aggregation_cost_min_ratio` (default 0.5). Targets ClickBench Q18-shape queries where the ratio (~0.56) sits just below the fixed 0.8 threshold so partial agg keeps running, but the absolute work (heavy variable-length keys, complex aggregates) makes it net-negative. Empirical motivation: lowering the global ratio threshold to 0.6 fixes Q18 (1.73× faster) but risks regressing low-cost queries at similar ratios. This flag exposes the lower threshold as a separate, opt-in knob. Whether the cost-aware signal (`partial_agg_probe_ns_per_row` metric) can replace this static threshold is an open question — for now the metric is reported alongside so callers can evaluate. | +| datafusion.execution.skip_partial_aggregation_ab_sampling_rows | 10000 | Number of input rows used in the A/B sampling window after the initial partial probe completes. During this window the operator routes input through the passthrough (`transform_to_states`) path so the probe can measure `passthrough_ns/row` and compare it against the previously measured `partial_ns/row`. Default 10000 — large enough to amortise per-row noise, small enough to be cheap if the decision turns out to be "keep partial". | | datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. | | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. |