From 218c19be5d98e45e4d551b55ca8b80bc0b255d53 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 24 May 2026 10:52:18 -0500 Subject: [PATCH 1/2] feat(pruning): PruningConjunction with per-leaf observation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `PruningConjunction` — an AND of tagged `PruningPredicate` leaves — plus a `PruningObserver` trait fired once per leaf actually evaluated. The AND short-circuits once every container is pruned; short-circuited leaves are not observed, so per-conjunct stats are never biased by predicates that did not run. Construction is via a small builder: let conj = PruningConjunction::builder(schema) .push(filter_id, expr) // runs try_new, skips always-true / errors .build(); // Option `PruningConjunction::single(predicate)` wraps one already-built predicate for the untagged path, which uses `prune()` (a `NoopObserver`) at zero overhead. `combined_orig_expr()` exposes the AND of the leaves' original expressions so consumers can invert the whole predicate (e.g. for fully-matched detection) without re-deriving it. `ConjunctStatsObserver` accumulates `PerConjunctPruneStats { tag, containers_seen, containers_pruned }`. The observer is plumbed as `&mut dyn PruningObserver` so consumers can hold it behind an `Option`. OR and NOT are intentionally *not* structural nodes: they are handled inside a leaf via `PruningPredicate::try_new`, which both prunes better (cross-branch reasoning) and keeps the per-leaf "containers pruned" count sound (it is only monotonic under AND). Per-branch OR short-circuit, where it is useful, belongs to the row-filter decode path over exact masks — not here. Replaces the placeholder-`true`-literal `sub_predicates` design and its double traversal from the earlier draft (#22235). Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/pruning/src/lib.rs | 5 + datafusion/pruning/src/pruning_conjunction.rs | 454 ++++++++++++++++++ 2 files changed, 459 insertions(+) create mode 100644 datafusion/pruning/src/pruning_conjunction.rs diff --git a/datafusion/pruning/src/lib.rs b/datafusion/pruning/src/lib.rs index be17f29eaafa0..97b11ad488fa1 100644 --- a/datafusion/pruning/src/lib.rs +++ b/datafusion/pruning/src/lib.rs @@ -18,9 +18,14 @@ #![cfg_attr(test, allow(clippy::needless_pass_by_value))] mod file_pruner; +mod pruning_conjunction; mod pruning_predicate; pub use file_pruner::FilePruner; +pub use pruning_conjunction::{ + ConjunctStatsObserver, NoopObserver, PerConjunctPruneStats, PruningConjunction, + PruningConjunctionBuilder, PruningObserver, Tag, +}; pub use pruning_predicate::{ PredicateRewriter, PruningPredicate, PruningStatistics, RequiredColumns, UnhandledPredicateHook, build_pruning_predicate, diff --git a/datafusion/pruning/src/pruning_conjunction.rs b/datafusion/pruning/src/pruning_conjunction.rs new file mode 100644 index 0000000000000..39174998e7978 --- /dev/null +++ b/datafusion/pruning/src/pruning_conjunction.rs @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A conjunction (`AND`) of tagged [`PruningPredicate`] leaves with +//! per-leaf observation. +//! +//! [`PruningConjunction`] evaluates each leaf against statistics and +//! ANDs the results, invoking a [`PruningObserver`] each time a leaf +//! is actually evaluated. The AND short-circuits once every container +//! is pruned, and leaves cut off that way are *not* observed — so +//! per-conjunct stats reflect only what was evaluated. +//! +//! ## Why only `AND` (no `OR` / `NOT`)? +//! +//! `OR` and `NOT` are handled *inside* a leaf via +//! [`PruningPredicate::try_new`], not as structural nodes here, for +//! two reasons: +//! +//! * **Better pruning.** `try_new(a < 5 OR b > 100)` reasons across +//! both branches; ORing two separate leaf masks is strictly more +//! conservative. +//! * **Sound stats.** The per-leaf "containers pruned" count is only +//! meaningful when pruning is monotonic, i.e. under `AND` (any leaf +//! pruning a container prunes the whole expression). Under `OR` a +//! leaf only helps where every sibling also prunes, so a per-leaf +//! count would mislead a scheduler. Negating a conservative mask is +//! likewise unsound. +//! +//! Per-branch `OR` short-circuit *is* useful during row-filter decode, +//! where masks are exact (not conservative) — that belongs to a +//! separate decode-path tree, not here. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion_common::Result; +use datafusion_common::pruning::PruningStatistics; +use datafusion_physical_plan::PhysicalExpr; +use log::debug; + +use crate::pruning_predicate::PruningPredicate; + +/// Caller-supplied identifier attached to a leaf so consumers can +/// correlate per-leaf observations back to the original conjunct +/// (e.g. an adaptive filter scheduler keys this by `FilterId`). +pub type Tag = u64; + +#[derive(Debug, Clone)] +struct TaggedLeaf { + tag: Option, + predicate: Arc, +} + +/// A conjunction of tagged [`PruningPredicate`] leaves. +/// +/// See the module-level docs for evaluation and short-circuit +/// semantics. Build one with [`PruningConjunction::builder`], or wrap +/// a single existing predicate with [`PruningConjunction::single`]. +#[derive(Debug, Clone)] +pub struct PruningConjunction { + schema: SchemaRef, + leaves: Vec, +} + +/// Observer invoked once per leaf that is actually evaluated against +/// statistics. Leaves skipped by AND short-circuit do not fire it. +/// +/// The default method does nothing, so a no-op observer is just +/// [`NoopObserver`]. +pub trait PruningObserver { + /// Called after a leaf has been evaluated. `mask[i] = true` means + /// container `i` may match this leaf (cannot be pruned); `false` + /// means the leaf alone proves the container can be skipped. + fn on_leaf(&mut self, _tag: Option, _mask: &[bool]) {} +} + +/// Zero-cost no-op observer used by the plain `prune()` path. +#[derive(Debug, Default, Clone, Copy)] +pub struct NoopObserver; + +impl PruningObserver for NoopObserver {} + +/// Per-leaf pruning rate accumulated by [`ConjunctStatsObserver`]. +/// +/// `containers_seen` counts the containers a leaf was evaluated +/// against; `containers_pruned` counts containers the leaf alone +/// proved skippable. Leaves cut off by AND short-circuit are not +/// counted (no event fires for them). +#[derive(Debug, Default, Clone, Copy)] +pub struct PerConjunctPruneStats { + pub tag: Tag, + pub containers_seen: usize, + pub containers_pruned: usize, +} + +impl PerConjunctPruneStats { + /// Pruning rate as a fraction in `[0.0, 1.0]`, or `None` when the + /// leaf has not been observed yet. + pub fn pruning_rate(&self) -> Option { + if self.containers_seen == 0 { + return None; + } + Some(self.containers_pruned as f64 / self.containers_seen as f64) + } +} + +/// Built-in observer accumulating [`PerConjunctPruneStats`] keyed by +/// [`Tag`]. Untagged leaves are ignored. Repeated observations for the +/// same tag (e.g. the same conjunction re-run across files) accumulate +/// additively. +#[derive(Debug, Default)] +pub struct ConjunctStatsObserver { + stats: HashMap, +} + +impl ConjunctStatsObserver { + pub fn new() -> Self { + Self::default() + } + + /// Take ownership of the accumulated stats, sorted by tag for + /// deterministic output. The observer is empty afterwards. + pub fn take(&mut self) -> Vec { + let mut out: Vec<_> = self.stats.drain().map(|(_, v)| v).collect(); + out.sort_by_key(|s| s.tag); + out + } + + /// Borrow the accumulated stats without consuming. + pub fn stats(&self) -> impl Iterator { + self.stats.values() + } +} + +impl PruningObserver for ConjunctStatsObserver { + fn on_leaf(&mut self, tag: Option, mask: &[bool]) { + let Some(tag) = tag else { + return; + }; + let entry = self.stats.entry(tag).or_insert(PerConjunctPruneStats { + tag, + containers_seen: 0, + containers_pruned: 0, + }); + entry.containers_seen += mask.len(); + entry.containers_pruned += mask.iter().filter(|b| !**b).count(); + } +} + +impl PruningConjunction { + /// Wrap a single already-built [`PruningPredicate`] as an untagged + /// one-leaf conjunction. Used by the standard (non-adaptive) + /// pruning path. + pub fn single(predicate: Arc) -> Self { + let schema = Arc::clone(predicate.schema()); + Self { + schema, + leaves: vec![TaggedLeaf { + tag: None, + predicate, + }], + } + } + + /// Start building a conjunction whose leaves are all evaluated + /// against `schema`. + pub fn builder(schema: SchemaRef) -> PruningConjunctionBuilder { + PruningConjunctionBuilder { + schema, + leaves: Vec::new(), + } + } + + /// Schema all leaves were built against. + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// Number of leaves. + pub fn num_leaves(&self) -> usize { + self.leaves.len() + } + + /// Evaluate the conjunction against `statistics`. Equivalent to + /// `prune_with_observer(statistics, &mut NoopObserver)`. + pub fn prune( + &self, + statistics: &S, + ) -> Result> { + self.prune_with_observer(statistics, &mut NoopObserver) + } + + /// Evaluate the conjunction against `statistics`, invoking + /// `observer.on_leaf(tag, mask)` once per leaf actually evaluated. + /// Short-circuits once every container is pruned. + pub fn prune_with_observer( + &self, + statistics: &S, + observer: &mut dyn PruningObserver, + ) -> Result> { + let n = statistics.num_containers(); + let mut combined = vec![true; n]; + for leaf in &self.leaves { + if combined.iter().all(|b| !b) { + // Every container already pruned; remaining leaves + // cannot change the result and are intentionally not + // observed. + break; + } + let mask = leaf.predicate.prune(statistics)?; + observer.on_leaf(leaf.tag, &mask); + for (c, v) in combined.iter_mut().zip(mask.iter()) { + *c = *c && *v; + } + } + Ok(combined) + } + + /// The conjunction's combined original expression — the `AND` of + /// each leaf's [`PruningPredicate::orig_expr`]. Callers that need + /// the *whole* predicate (e.g. to invert it for fully-matched + /// detection) use this rather than re-deriving it, so the + /// conjunction stays the single source of truth. + pub fn combined_orig_expr(&self) -> Arc { + datafusion_physical_expr::conjunction( + self.leaves + .iter() + .map(|l| Arc::clone(l.predicate.orig_expr())) + .collect::>(), + ) + } +} + +/// Builder for [`PruningConjunction`]. Each [`push`](Self::push) runs +/// [`PruningPredicate::try_new`]; conjuncts that error or simplify to +/// always-true are skipped (their tags will not appear in observer +/// output). [`build`](Self::build) returns `None` when no non-trivial +/// leaf survived. +#[derive(Debug)] +pub struct PruningConjunctionBuilder { + schema: SchemaRef, + leaves: Vec, +} + +impl PruningConjunctionBuilder { + /// Add a tagged conjunct. Mutates and returns `&mut self`, so this + /// works both in chains and in loops: + /// + /// ```ignore + /// let mut b = PruningConjunction::builder(schema); + /// for (id, expr) in filters { + /// b.push(id, expr); + /// } + /// let conj = b.build(); + /// ``` + pub fn push(&mut self, tag: Tag, expr: Arc) -> &mut Self { + match PruningPredicate::try_new(expr, Arc::clone(&self.schema)) { + Ok(pp) if !pp.always_true() => { + self.leaves.push(TaggedLeaf { + tag: Some(tag), + predicate: Arc::new(pp), + }); + } + Ok(_) => { + // always-true: contributes nothing; tag intentionally dropped. + } + Err(e) => { + debug!("PruningConjunctionBuilder: skipping conjunct tag={tag}: {e}"); + } + } + self + } + + /// Finish building. Returns `None` if no non-trivial leaf was added. + pub fn build(self) -> Option { + if self.leaves.is_empty() { + return None; + } + Some(PruningConjunction { + schema: self.schema, + leaves: self.leaves, + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::{ArrayRef, Int32Array}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::pruning::PruningStatistics; + use datafusion_common::{Column, ScalarValue}; + use datafusion_expr_common::operator::Operator; + use datafusion_physical_expr::expressions::{BinaryExpr, Column as PhysCol, Literal}; + use datafusion_physical_plan::PhysicalExpr; + + use crate::pruning_predicate::PruningPredicate; + + use super::*; + + /// 3-container statistics: container i sees `x` in [mins[i], maxes[i]]. + struct TestStats { + mins: Vec, + maxes: Vec, + } + impl PruningStatistics for TestStats { + fn min_values(&self, _column: &Column) -> Option { + Some(Arc::new(Int32Array::from(self.mins.clone()))) + } + fn max_values(&self, _column: &Column) -> Option { + Some(Arc::new(Int32Array::from(self.maxes.clone()))) + } + fn num_containers(&self) -> usize { + self.mins.len() + } + fn null_counts(&self, _column: &Column) -> Option { + None + } + fn row_counts(&self) -> Option { + None + } + fn contained( + &self, + _column: &Column, + _values: &std::collections::HashSet, + ) -> Option { + None + } + } + + fn schema_x() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, true)])) + } + + fn expr(op: Operator, rhs: i32) -> Arc { + Arc::new(BinaryExpr::new( + Arc::new(PhysCol::new("x", 0)), + op, + Arc::new(Literal::new(ScalarValue::Int32(Some(rhs)))), + )) + } + + fn pp(op: Operator, rhs: i32, schema: &SchemaRef) -> Arc { + Arc::new(PruningPredicate::try_new(expr(op, rhs), Arc::clone(schema)).unwrap()) + } + + #[derive(Default)] + struct RecordingObserver { + events: Vec<(Option, Vec)>, + } + impl PruningObserver for RecordingObserver { + fn on_leaf(&mut self, tag: Option, mask: &[bool]) { + self.events.push((tag, mask.to_vec())); + } + } + + #[test] + fn single_behaves_like_plain_pruning_predicate() { + let schema = schema_x(); + let predicate = pp(Operator::Gt, 5, &schema); // x > 5 + let conj = PruningConjunction::single(Arc::clone(&predicate)); + let stats = TestStats { + mins: vec![0, 6, 10], + maxes: vec![3, 10, 20], + }; + assert_eq!( + conj.prune(&stats).unwrap(), + predicate.prune(&stats).unwrap() + ); + } + + #[test] + fn builder_accumulates_per_conjunct_stats() { + // (x > 5) AND (x < 100): p1 prunes container 0, p2 prunes none. + let schema = schema_x(); + let mut b = PruningConjunction::builder(Arc::clone(&schema)); + b.push(101, expr(Operator::Gt, 5)) + .push(202, expr(Operator::Lt, 100)); + let conj = b.build().expect("non-trivial"); + + let stats = TestStats { + mins: vec![0, 6, 10], + maxes: vec![3, 10, 20], + }; + let mut obs = ConjunctStatsObserver::new(); + let mask = conj.prune_with_observer(&stats, &mut obs).unwrap(); + assert_eq!(mask, vec![false, true, true]); + + let stats = obs.take(); + assert_eq!(stats.len(), 2); + let s101 = stats.iter().find(|s| s.tag == 101).unwrap(); + assert_eq!((s101.containers_seen, s101.containers_pruned), (3, 1)); + let s202 = stats.iter().find(|s| s.tag == 202).unwrap(); + assert_eq!((s202.containers_seen, s202.containers_pruned), (3, 0)); + } + + #[test] + fn and_short_circuit_keeps_stats_unbiased() { + // p1 (x > 100) prunes everything alone; p2 (x < 50) must NOT be + // observed — it never ran. + let schema = schema_x(); + let mut b = PruningConjunction::builder(Arc::clone(&schema)); + b.push(1, expr(Operator::Gt, 100)) + .push(2, expr(Operator::Lt, 50)); + let conj = b.build().unwrap(); + + let stats = TestStats { + mins: vec![0, 0, 0], + maxes: vec![3, 10, 20], + }; + let mut obs = RecordingObserver::default(); + let mask = conj.prune_with_observer(&stats, &mut obs).unwrap(); + assert_eq!(mask, vec![false, false, false]); + assert_eq!(obs.events.len(), 1); + assert_eq!(obs.events[0].0, Some(1)); + } + + #[test] + fn builder_drops_always_true() { + let schema = schema_x(); + let always_true: Arc = + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))); + let mut b = PruningConjunction::builder(Arc::clone(&schema)); + b.push(1, always_true).push(2, expr(Operator::Gt, 5)); + let conj = b.build().expect("one non-trivial leaf"); + assert_eq!(conj.num_leaves(), 1); + } + + #[test] + fn builder_all_trivial_returns_none() { + let schema = schema_x(); + let always_true: Arc = + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))); + let mut b = PruningConjunction::builder(schema); + b.push(1, always_true); + assert!(b.build().is_none()); + } +} From 897f9ead7e2b0779f409e700d51d79e279901259 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 24 May 2026 10:52:29 -0500 Subject: [PATCH 2/2] feat(parquet): observer-driven pruning in row-group and page filters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both filters expose a fluent, observer-capable prune op alongside the existing methods (kept as thin shims, deprecatable later): filter.prune_row_groups(&ctx, &conjunction) .with_observer(&mut obs) // optional .prune(); let result = filter.prune_pages(access_plan, &ctx) .with_observer(&mut obs) // optional .prune(); The four always-together refs (arrow/parquet schema, metadata/groups, metrics) are bundled into `RowGroupPruningContext` / `PagePruningContext` so no method carries a long argument list. The observer-accepting ops are `pub(crate)` — the consumer (adaptive parquet scan) is in-crate — so the public API stays `prune_by_statistics` / `prune_plan_with_page_index`, unchanged. `RowGroupAccessPlanFilter`: `prune_by_statistics` now wraps the predicate via `PruningConjunction::single` and delegates; fully-matched detection derives the combined expression from the conjunction's `combined_orig_expr()` rather than taking a separate predicate argument. `PagePruningAccessPlanFilter`: predicates carry an optional `Tag` internally (`TaggedPagePredicate`); a builder replaces the old tuple-slice `new_tagged`. The page-index loop emits one observer event per leaf actually evaluated — leaves cut off by the `!selects_any` short-circuit are not observed (resolves reviewer Q2 on #22235: stats are not biased by predicates that never ran). The untagged and observer paths share one body (`run_prune`); no duplicated method. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../datasource-parquet/src/opener/mod.rs | 17 +- .../datasource-parquet/src/page_filter.rs | 284 ++++++++++++++--- .../src/row_group_filter.rs | 287 ++++++++++++++---- 3 files changed, 479 insertions(+), 109 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index f138a26bf4701..116a831aa41d4 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -25,7 +25,7 @@ use self::early_stop::EarlyStoppingStream; use self::encryption::EncryptionContext; use crate::access_plan::PreparedAccessPlan; use crate::decoder_projection::DecoderProjection; -use crate::page_filter::PagePruningAccessPlanFilter; +use crate::page_filter::{PagePruningAccessPlanFilter, PagePruningContext}; use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState}; use crate::row_filter::RowFilterGenerator; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; @@ -1103,14 +1103,15 @@ impl RowGroupsPrunedParquetOpen { && !access_plan.is_empty() && let Some(page_pruning_predicate) = page_pruning_predicate { + let ctx = PagePruningContext { + arrow_schema: &prepared.physical_file_schema, + parquet_schema: reader_metadata.parquet_schema(), + parquet_metadata: file_metadata.as_ref(), + file_metrics: &prepared.file_metrics, + }; let page_pruning_result = page_pruning_predicate - .prune_plan_with_page_index_and_metrics( - access_plan, - &prepared.physical_file_schema, - reader_metadata.parquet_schema(), - file_metadata.as_ref(), - &prepared.file_metrics, - ); + .prune_pages(access_plan, &ctx) + .prune(); access_plan = page_pruning_result.access_plan; ParquetFileMetrics::add_page_index_pages_skipped_by_fully_matched( &prepared.metrics, diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index 795a63268b6a9..c087a2e0f029e 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -31,7 +31,7 @@ use arrow::{ use datafusion_common::ScalarValue; use datafusion_common::pruning::PruningStatistics; use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; -use datafusion_pruning::PruningPredicate; +use datafusion_pruning::{NoopObserver, PruningObserver, PruningPredicate, Tag}; use log::{debug, trace}; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; @@ -110,9 +110,61 @@ use parquet::{ /// row selection that is added to the [`ParquetAccessPlan`]. #[derive(Debug)] pub struct PagePruningAccessPlanFilter { - /// single column predicates (e.g. (`col = 5`) extracted from the overall - /// predicate. Must all be true for a row to be included in the result. - predicates: Vec, + /// Single-column conjuncts split from the top-level `AND` of the + /// overall predicate (via `split_conjunction`), keeping only those + /// that build into a non-trivial single-column `PruningPredicate` + /// (e.g. `col = 5`, `col < 10`, or a same-column `col < 5 OR col > 100`). + /// Multi-column, always-true, and non-buildable conjuncts are + /// dropped, so this is a *necessary-but-not-sufficient* subset of + /// the filter: every entry must hold for a row to be included, but + /// satisfying all of them does not by itself guarantee inclusion. + /// Page pruning ANDs each entry's per-page selection — a page is + /// kept only if it may satisfy all of them. + /// + /// Each carries an optional caller [`Tag`]; when present, the + /// page-index evaluation loop fires `observer.on_leaf(tag, ..)` + /// after evaluating that predicate. Untagged predicates produce + /// `None` tags and no-op against `NoopObserver`. + predicates: Vec, +} + +/// Single-column predicate paired with an optional caller tag. +#[derive(Debug)] +struct TaggedPagePredicate { + tag: Option, + predicate: PruningPredicate, +} + +/// Builder for a tagged [`PagePruningAccessPlanFilter`]. Each +/// [`push`](Self::push) runs the same single-column / non-trivial +/// filtering as [`PagePruningAccessPlanFilter::new`]; conjuncts that +/// fail it are dropped (their tags will not appear in observer +/// output). [`build`](Self::build) always succeeds — an empty filter +/// is a valid no-op pruner. +#[derive(Debug)] +pub struct PagePruningAccessPlanFilterBuilder { + schema: SchemaRef, + predicates: Vec, +} + +impl PagePruningAccessPlanFilterBuilder { + /// Add a tagged conjunct. Mutates and returns `&mut self`, so this + /// works in both chains and loops. + pub fn push(&mut self, tag: Tag, expr: Arc) -> &mut Self { + if let Some(p) = + PagePruningAccessPlanFilter::build_one(expr, &self.schema, Some(tag)) + { + self.predicates.push(p); + } + self + } + + /// Finish building. + pub fn build(self) -> PagePruningAccessPlanFilter { + PagePruningAccessPlanFilter { + predicates: self.predicates, + } + } } /// Result of applying page-index pruning to a [`ParquetAccessPlan`]. @@ -135,44 +187,129 @@ impl PagePruningResult { } } +/// The always-together context for a page-index prune. Bundled so the +/// prune entry points don't carry four loose refs. +#[derive(Clone, Copy)] +pub(crate) struct PagePruningContext<'a> { + pub arrow_schema: &'a Schema, + pub parquet_schema: &'a SchemaDescriptor, + pub parquet_metadata: &'a ParquetMetaData, + pub file_metrics: &'a ParquetFileMetrics, +} + +/// A configured-but-not-yet-run page-index prune, produced by +/// [`PagePruningAccessPlanFilter::prune_pages`]. Optionally attach an +/// observer with [`with_observer`](Self::with_observer), then execute +/// with [`prune`](Self::prune). +pub(crate) struct PagePrune<'a> { + filter: &'a PagePruningAccessPlanFilter, + access_plan: ParquetAccessPlan, + ctx: &'a PagePruningContext<'a>, + observer: Option<&'a mut dyn PruningObserver>, +} + +impl<'a> PagePrune<'a> { + /// Attach a per-leaf observer. Omit to prune without collecting + /// per-conjunct stats (zero overhead). + // Consumed by the adaptive parquet scan (later in the stack); the + // untagged `prune_plan_with_page_index` path never sets an observer. + #[expect( + dead_code, + reason = "tagged page-prune consumer lands later in the stack" + )] + pub fn with_observer(mut self, observer: &'a mut dyn PruningObserver) -> Self { + self.observer = Some(observer); + self + } + + /// Run the prune, returning the updated access plan and metrics. + pub fn prune(self) -> PagePruningResult { + let PagePrune { + filter, + access_plan, + ctx, + observer, + } = self; + let mut noop = NoopObserver; + let observer: &mut dyn PruningObserver = match observer { + Some(o) => o, + None => &mut noop, + }; + filter.run_prune(access_plan, ctx, observer) + } +} + impl PagePruningAccessPlanFilter { /// Create a new [`PagePruningAccessPlanFilter`] from a physical - /// expression. + /// expression. Predicates created this way have no caller tag. #[expect(clippy::needless_pass_by_value)] pub fn new(expr: &Arc, schema: SchemaRef) -> Self { - // extract any single column predicates let predicates = split_conjunction(expr) .into_iter() - .filter_map(|predicate| { - let pp = match PruningPredicate::try_new( - Arc::clone(predicate), - Arc::clone(&schema), - ) { - Ok(pp) => pp, - Err(e) => { - debug!("Ignoring error creating page pruning predicate: {e}"); - return None; - } - }; - - if pp.always_true() { - debug!("Ignoring always true page pruning predicate: {predicate}"); - return None; - } - - if pp.required_columns().single_column().is_none() { - debug!("Ignoring multi-column page pruning predicate: {predicate}"); - return None; - } - - Some(pp) - }) + .filter_map(|predicate| Self::build_one(Arc::clone(predicate), &schema, None)) .collect::>(); Self { predicates } } + /// Start building a filter whose conjuncts each carry a + /// caller-supplied [`Tag`]. Mirrors [`PruningConjunction::builder`]: + /// + /// ```ignore + /// let mut b = PagePruningAccessPlanFilter::builder(schema); + /// for (id, expr) in conjuncts { + /// b.push(id, expr); + /// } + /// let filter = b.build(); + /// ``` + /// + /// During pruning the page-index loop fires `observer.on_leaf(tag, + /// ..)` once per leaf actually evaluated; leaves cut off by the + /// AND short-circuit on row selection (`!selects_any`) are not + /// observed. + /// + /// [`PruningConjunction::builder`]: datafusion_pruning::PruningConjunction::builder + pub fn builder(schema: SchemaRef) -> PagePruningAccessPlanFilterBuilder { + PagePruningAccessPlanFilterBuilder { + schema, + predicates: Vec::new(), + } + } + + fn build_one( + expr: Arc, + schema: &SchemaRef, + tag: Option, + ) -> Option { + let pp = match PruningPredicate::try_new(expr, Arc::clone(schema)) { + Ok(pp) => pp, + Err(e) => { + debug!("Ignoring error creating page pruning predicate: {e}"); + return None; + } + }; + if pp.always_true() { + debug!( + "Ignoring always true page pruning predicate: {}", + pp.orig_expr() + ); + return None; + } + if pp.required_columns().single_column().is_none() { + debug!( + "Ignoring multi-column page pruning predicate: {}", + pp.orig_expr() + ); + return None; + } + Some(TaggedPagePredicate { tag, predicate: pp }) + } + /// Returns an updated [`ParquetAccessPlan`] by applying predicates to the - /// parquet page index, if any + /// parquet page index, if any. + /// + /// Thin shim over the fluent `prune_pages` op (no observer). + /// Could be `#[deprecated]` in favor of the fluent form once the + /// adaptive scan migrates. pub fn prune_plan_with_page_index( &self, access_plan: ParquetAccessPlan, @@ -181,26 +318,55 @@ impl PagePruningAccessPlanFilter { parquet_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, ) -> ParquetAccessPlan { - self.prune_plan_with_page_index_and_metrics( - access_plan, + let ctx = PagePruningContext { arrow_schema, parquet_schema, parquet_metadata, file_metrics, - ) - .access_plan + }; + self.prune_pages(access_plan, &ctx).prune().access_plan + } + + /// Begin a page-index prune. Returns a [`PagePrune`] op that can + /// optionally take an observer before running: + /// + /// ```ignore + /// let result = filter.prune_pages(access_plan, &ctx) + /// .with_observer(&mut obs) // optional + /// .prune(); + /// ``` + pub(crate) fn prune_pages<'a>( + &'a self, + access_plan: ParquetAccessPlan, + ctx: &'a PagePruningContext<'a>, + ) -> PagePrune<'a> { + PagePrune { + filter: self, + access_plan, + ctx, + observer: None, + } } - /// Returns an updated [`ParquetAccessPlan`] and metrics by applying predicates - /// to the parquet page index, if any. - pub(crate) fn prune_plan_with_page_index_and_metrics( + /// Workhorse for page-index pruning. The observer fires + /// `on_leaf(tag, mask)` once per predicate that is actually + /// evaluated; `mask[i] = true` means row group `i` still has at + /// least one page that may match that leaf. Predicates skipped by + /// the per-row-group `!selects_any` short-circuit are not observed, + /// so per-conjunct stats are not biased by predicates that never + /// ran (resolves the reviewer's Q2 concern on PR #22235). + fn run_prune( &self, mut access_plan: ParquetAccessPlan, - arrow_schema: &Schema, - parquet_schema: &SchemaDescriptor, - parquet_metadata: &ParquetMetaData, - file_metrics: &ParquetFileMetrics, + ctx: &PagePruningContext<'_>, + observer: &mut dyn PruningObserver, ) -> PagePruningResult { + let PagePruningContext { + arrow_schema, + parquet_schema, + parquet_metadata, + file_metrics, + } = *ctx; // scoped timer updates on drop let _timer_guard = file_metrics.page_index_eval_time.timer(); if self.predicates.is_empty() { @@ -210,6 +376,16 @@ impl PagePruningAccessPlanFilter { let page_index_predicates = &self.predicates; let groups = parquet_metadata.row_groups(); + // Per-leaf "did this row group still have any matching pages + // after the leaf alone?" mask. Built across all row groups, + // emitted via `observer.on_leaf` at the end so each leaf gets + // exactly one observation per call to this function. Leaves + // never evaluated (because an earlier conjunct emptied the + // running row selection — `!selects_any` break below) end up + // with `None` here and are intentionally not observed. + let mut per_leaf_mask: Vec>> = + (0..page_index_predicates.len()).map(|_| None).collect(); + if groups.is_empty() { return PagePruningResult::new(access_plan, 0); } @@ -262,7 +438,8 @@ impl PagePruningAccessPlanFilter { let mut matched_pages_in_group: HashSet = HashSet::from_iter(0..total_pages_in_group); - for predicate in page_index_predicates { + for (leaf_idx, tagged) in page_index_predicates.iter().enumerate() { + let predicate = &tagged.predicate; let Some(column) = predicate.required_columns().single_column() else { debug!( "Ignoring multi-column page pruning predicate: {:?}", @@ -307,6 +484,15 @@ impl PagePruningAccessPlanFilter { predicate.predicate_expr(), ); + // Per-leaf observation: this leaf ran for `row_group_index` + // and produced `selection`. Whether this row group is + // "kept" by the leaf is `selection.selects_any()`. The + // entry is created lazily so leaves never observed (due + // to short-circuit) stay `None`. + let mask = per_leaf_mask[leaf_idx] + .get_or_insert_with(|| vec![false; groups.len()]); + mask[row_group_index] = selection.selects_any(); + let matched_pages_indexes: HashSet<_> = pages .into_iter() .enumerate() @@ -369,6 +555,18 @@ impl PagePruningAccessPlanFilter { file_metrics .page_index_pages_pruned .add_matched(total_pages_select); + + // Emit one observer event per leaf that was actually + // evaluated against at least one row group. Leaves that the + // outer `!selects_any` short-circuit prevented from running + // stay `None` here and are correctly absent from the stats. + for (leaf_idx, mask_opt) in per_leaf_mask.into_iter().enumerate() { + if let Some(mask) = mask_opt { + let tag = page_index_predicates[leaf_idx].tag; + observer.on_leaf(tag, &mask); + } + } + PagePruningResult::new(access_plan, total_pages_skipped_by_fully_matched) } diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 07f4fe92cf308..f52fc6978aeea 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -23,7 +23,7 @@ use super::{ParquetAccessPlan, ParquetFileMetrics}; // path keeps resolving for in-crate callers (e.g. `opener`). pub(crate) use crate::bloom_filter::BloomFilterStatistics; use arrow::array::{ArrayRef, BooleanArray, UInt64Array}; -use arrow::datatypes::Schema; +use arrow::datatypes::{Schema, SchemaRef}; use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Column, Result, ScalarValue}; use datafusion_datasource::FileRange; @@ -31,7 +31,9 @@ use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{BinaryExpr, IsNullExpr, NotExpr}; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprSimplifier}; -use datafusion_pruning::PruningPredicate; +use datafusion_pruning::{ + NoopObserver, PruningConjunction, PruningObserver, PruningPredicate, +}; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::file::metadata::RowGroupMetaData; use parquet::schema::types::SchemaDescriptor; @@ -48,6 +50,116 @@ pub struct RowGroupAccessPlanFilter { access_plan: ParquetAccessPlan, } +/// The always-together context for a statistics-based row-group prune. +/// Bundled so the prune entry points don't carry four loose refs. +#[derive(Clone, Copy)] +pub(crate) struct RowGroupPruningContext<'a> { + pub arrow_schema: &'a Schema, + pub parquet_schema: &'a SchemaDescriptor, + pub groups: &'a [RowGroupMetaData], + pub metrics: &'a ParquetFileMetrics, +} + +/// A configured-but-not-yet-run statistics prune, produced by +/// [`RowGroupAccessPlanFilter::prune_row_groups`]. Optionally attach an +/// observer with [`with_observer`](Self::with_observer), then execute +/// with [`prune`](Self::prune). +pub(crate) struct RowGroupStatisticsPrune<'a> { + filter: &'a mut RowGroupAccessPlanFilter, + ctx: &'a RowGroupPruningContext<'a>, + conjunction: &'a PruningConjunction, + observer: Option<&'a mut dyn PruningObserver>, +} + +impl<'a> RowGroupStatisticsPrune<'a> { + /// Attach a per-leaf observer. Omit this call to prune without + /// collecting per-conjunct stats (zero overhead). + // Exercised by tests today; the prod consumer (adaptive parquet + // scan) lands later in the stack. `expect` only in non-test builds, + // where it is genuinely unused. + #[cfg_attr( + not(test), + expect( + dead_code, + reason = "tagged row-group consumer lands later in the stack" + ) + )] + pub fn with_observer(mut self, observer: &'a mut dyn PruningObserver) -> Self { + self.observer = Some(observer); + self + } + + /// Run the prune, updating the underlying access plan in place. + pub fn prune(self) { + let RowGroupStatisticsPrune { + filter, + ctx, + conjunction, + observer, + } = self; + + // scoped timer updates on drop + let _timer_guard = ctx.metrics.statistics_eval_time.timer(); + + assert_eq!(ctx.groups.len(), filter.access_plan.len()); + // Indexes of row groups still to scan + let row_group_indexes = filter.access_plan.row_group_indexes(); + let row_group_metadatas = row_group_indexes + .iter() + .map(|&i| &ctx.groups[i]) + .collect::>(); + + let pruning_stats = RowGroupPruningStatistics { + parquet_schema: ctx.parquet_schema, + row_group_metadatas, + arrow_schema: ctx.arrow_schema, + // Preserve the existing row-group pruning behavior. This path only + // proves whether matching rows may exist, so it uses the + // StatisticsConverter default for older parquet-rs files where a + // missing null count can mean there are zero nulls. + missing_null_counts_as_zero: true, + }; + + let mut noop = NoopObserver; + let observer: &mut dyn PruningObserver = match observer { + Some(o) => o, + None => &mut noop, + }; + + match conjunction.prune_with_observer(&pruning_stats, observer) { + Ok(values) => { + let mut fully_contained_candidates_original_idx: Vec = Vec::new(); + for (idx, &value) in row_group_indexes.iter().zip(values.iter()) { + if !value { + filter.access_plan.skip(*idx); + ctx.metrics.row_groups_pruned_statistics.add_pruned(1); + } else { + ctx.metrics.row_groups_pruned_statistics.add_matched(1); + fully_contained_candidates_original_idx.push(*idx); + } + } + + // Fully-matched detection inverts the *combined* + // expression; the conjunction is the single source of + // truth for it. The observer has already fired for the + // per-leaf evaluation pass. + let combined_orig_expr = conjunction.combined_orig_expr(); + filter.identify_fully_matched_row_groups( + &fully_contained_candidates_original_idx, + ctx, + &combined_orig_expr, + conjunction.schema(), + ); + } + // stats filter array could not be built, so we can't prune + Err(e) => { + log::debug!("Error evaluating row group predicate values {e}"); + ctx.metrics.predicate_evaluation_errors.add(1); + } + } + } +} + impl RowGroupAccessPlanFilter { /// Create a new `RowGroupPlanBuilder` for pruning out the groups to scan /// based on metadata and statistics @@ -263,57 +375,38 @@ impl RowGroupAccessPlanFilter { predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) { - // scoped timer updates on drop - let _timer_guard = metrics.statistics_eval_time.timer(); - - assert_eq!(groups.len(), self.access_plan.len()); - // Indexes of row groups still to scan - let row_group_indexes = self.access_plan.row_group_indexes(); - let row_group_metadatas = row_group_indexes - .iter() - .map(|&i| &groups[i]) - .collect::>(); - - let pruning_stats = RowGroupPruningStatistics { - parquet_schema, - row_group_metadatas, + // Thin shim over the fluent `prune_row_groups` op (no observer). + // Could be `#[deprecated]` in favor of the fluent form once the + // adaptive scan migrates. + let conjunction = PruningConjunction::single(Arc::new(predicate.clone())); + let ctx = RowGroupPruningContext { arrow_schema, - // Preserve the existing row-group pruning behavior. This path only - // proves whether matching rows may exist, so it uses the - // StatisticsConverter default for older parquet-rs files where a - // missing null count can mean there are zero nulls. - missing_null_counts_as_zero: true, + parquet_schema, + groups, + metrics, }; + self.prune_row_groups(&ctx, &conjunction).prune(); + } - // try to prune the row groups in a single call - match predicate.prune(&pruning_stats) { - Ok(values) => { - let mut fully_contained_candidates_original_idx: Vec = Vec::new(); - for (idx, &value) in row_group_indexes.iter().zip(values.iter()) { - if !value { - self.access_plan.skip(*idx); - metrics.row_groups_pruned_statistics.add_pruned(1); - } else { - metrics.row_groups_pruned_statistics.add_matched(1); - fully_contained_candidates_original_idx.push(*idx); - } - } - - // Check if any of the matched row groups are fully contained by the predicate - self.identify_fully_matched_row_groups( - &fully_contained_candidates_original_idx, - arrow_schema, - parquet_schema, - groups, - predicate, - metrics, - ); - } - // stats filter array could not be built, so we can't prune - Err(e) => { - log::debug!("Error evaluating row group predicate values {e}"); - metrics.predicate_evaluation_errors.add(1); - } + /// Begin a statistics-based row-group prune. Returns a + /// [`RowGroupStatisticsPrune`] op that can optionally take an + /// observer before running: + /// + /// ```ignore + /// filter.prune_row_groups(&ctx, &conjunction) + /// .with_observer(&mut obs) // optional + /// .prune(); + /// ``` + pub(crate) fn prune_row_groups<'a>( + &'a mut self, + ctx: &'a RowGroupPruningContext<'a>, + conjunction: &'a PruningConjunction, + ) -> RowGroupStatisticsPrune<'a> { + RowGroupStatisticsPrune { + filter: self, + ctx, + conjunction, + observer: None, } } @@ -328,25 +421,29 @@ impl RowGroupAccessPlanFilter { fn identify_fully_matched_row_groups( &mut self, candidate_row_group_indices: &[usize], - arrow_schema: &Schema, - parquet_schema: &SchemaDescriptor, - groups: &[RowGroupMetaData], - predicate: &PruningPredicate, - metrics: &ParquetFileMetrics, + ctx: &RowGroupPruningContext<'_>, + combined_orig_expr: &Arc, + schema: &SchemaRef, ) { if candidate_row_group_indices.is_empty() { return; } + let RowGroupPruningContext { + arrow_schema, + parquet_schema, + groups, + metrics, + } = *ctx; let mut inverted_expr: Arc = - Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr()))); + Arc::new(NotExpr::new(Arc::clone(combined_orig_expr))); // Rows where the predicate evaluates to NULL do not pass the filter. // Include NULL checks in the inverted expression so a row group is only // considered fully matched when every referenced column is known non-null. // This is conservative for null-accepting predicates, but fully matched // row groups must not have false positives. - let mut columns = collect_columns(predicate.orig_expr()) + let mut columns = collect_columns(combined_orig_expr) .into_iter() .filter(|column| arrow_schema.field(column.index()).is_nullable()) .collect::>(); @@ -372,7 +469,7 @@ impl RowGroupAccessPlanFilter { }; let Ok(inverted_predicate) = - PruningPredicate::try_new(inverted_expr, Arc::clone(predicate.schema())) + PruningPredicate::try_new(inverted_expr, Arc::clone(schema)) else { return; }; @@ -642,6 +739,80 @@ mod tests { assert_pruned(row_groups, ExpectedPruning::Some(vec![1])) } + #[test] + fn row_group_observer_captures_per_conjunct_stats() { + // c1 > 15 AND c1 < 50, with caller filter ids attached so we + // can read per-conjunct stats from the observer. With two + // row groups [1,10] and [11,20]: + // - leaf "c1 > 15" prunes rg0 (max=10<=15); keeps rg1 + // - leaf "c1 < 50" keeps both + use datafusion_expr::{col, lit}; + use datafusion_physical_expr::PhysicalExpr; + use datafusion_pruning::{ConjunctStatsObserver, PruningConjunction}; + + let schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); + let expr_gt15: Arc = + logical2physical(&col("c1").gt(lit(15)), &schema); + let expr_lt50: Arc = + logical2physical(&col("c1").lt(lit(50)), &schema); + + let mut builder = PruningConjunction::builder(Arc::clone(&schema)); + builder.push(101, expr_gt15).push(202, expr_lt50); + let conjunction = builder.build().expect("non-trivial"); + + let field = PrimitiveTypeField::new("c1", PhysicalType::INT32); + let schema_descr = get_test_schema_descr(vec![field]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32( + Some(1), + Some(10), + None, + Some(0), + false, + )], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32( + Some(11), + Some(20), + None, + Some(0), + false, + )], + ); + + let metrics = parquet_file_metrics(); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2)); + let mut observer = ConjunctStatsObserver::new(); + let groups = [rgm1, rgm2]; + let ctx = RowGroupPruningContext { + arrow_schema: &schema, + parquet_schema: &schema_descr, + groups: &groups, + metrics: &metrics, + }; + row_groups + .prune_row_groups(&ctx, &conjunction) + .with_observer(&mut observer) + .prune(); + + // Same row-group pruning result as the untagged path. + assert_pruned(row_groups, ExpectedPruning::Some(vec![1])); + + // Per-conjunct stats are surfaced by the observer. + let stats = observer.take(); + assert_eq!(stats.len(), 2); + let s101 = stats.iter().find(|s| s.tag == 101).unwrap(); + assert_eq!(s101.containers_seen, 2); + assert_eq!(s101.containers_pruned, 1); + let s202 = stats.iter().find(|s| s.tag == 202).unwrap(); + assert_eq!(s202.containers_seen, 2); + assert_eq!(s202.containers_pruned, 0); + } + #[test] fn row_group_fully_matched_requires_known_non_null_predicate_columns() { use datafusion_expr::{col, lit};