Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions datafusion/datasource-parquet/src/opener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use self::early_stop::EarlyStoppingStream;
use self::encryption::EncryptionContext;
use crate::access_plan::PreparedAccessPlan;
use crate::decoder_projection::DecoderProjection;
use crate::page_filter::PagePruningAccessPlanFilter;
use crate::page_filter::{PagePruningAccessPlanFilter, PagePruningContext};
use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState};
use crate::row_filter::RowFilterGenerator;
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
Expand Down Expand Up @@ -1103,14 +1103,15 @@ impl RowGroupsPrunedParquetOpen {
&& !access_plan.is_empty()
&& let Some(page_pruning_predicate) = page_pruning_predicate
{
let ctx = PagePruningContext {
arrow_schema: &prepared.physical_file_schema,
parquet_schema: reader_metadata.parquet_schema(),
parquet_metadata: file_metadata.as_ref(),
file_metrics: &prepared.file_metrics,
};
let page_pruning_result = page_pruning_predicate
.prune_plan_with_page_index_and_metrics(
access_plan,
&prepared.physical_file_schema,
reader_metadata.parquet_schema(),
file_metadata.as_ref(),
&prepared.file_metrics,
);
.prune_pages(access_plan, &ctx)
.prune();
access_plan = page_pruning_result.access_plan;
ParquetFileMetrics::add_page_index_pages_skipped_by_fully_matched(
&prepared.metrics,
Expand Down
284 changes: 241 additions & 43 deletions datafusion/datasource-parquet/src/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use arrow::{
use datafusion_common::ScalarValue;
use datafusion_common::pruning::PruningStatistics;
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
use datafusion_pruning::PruningPredicate;
use datafusion_pruning::{NoopObserver, PruningObserver, PruningPredicate, Tag};

use log::{debug, trace};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
Expand Down Expand Up @@ -110,9 +110,61 @@ use parquet::{
/// row selection that is added to the [`ParquetAccessPlan`].
#[derive(Debug)]
pub struct PagePruningAccessPlanFilter {
/// single column predicates (e.g. (`col = 5`) extracted from the overall
/// predicate. Must all be true for a row to be included in the result.
predicates: Vec<PruningPredicate>,
/// Single-column conjuncts split from the top-level `AND` of the
/// overall predicate (via `split_conjunction`), keeping only those
/// that build into a non-trivial single-column `PruningPredicate`
/// (e.g. `col = 5`, `col < 10`, or a same-column `col < 5 OR col > 100`).
/// Multi-column, always-true, and non-buildable conjuncts are
/// dropped, so this is a *necessary-but-not-sufficient* subset of
/// the filter: every entry must hold for a row to be included, but
/// satisfying all of them does not by itself guarantee inclusion.
/// Page pruning ANDs each entry's per-page selection — a page is
/// kept only if it may satisfy all of them.
///
/// Each carries an optional caller [`Tag`]; when present, the
/// page-index evaluation loop fires `observer.on_leaf(tag, ..)`
/// after evaluating that predicate. Untagged predicates produce
/// `None` tags and no-op against `NoopObserver`.
predicates: Vec<TaggedPagePredicate>,
}

/// Single-column predicate paired with an optional caller tag.
#[derive(Debug)]
struct TaggedPagePredicate {
tag: Option<Tag>,
predicate: PruningPredicate,
}

/// Builder for a tagged [`PagePruningAccessPlanFilter`]. Each
/// [`push`](Self::push) runs the same single-column / non-trivial
/// filtering as [`PagePruningAccessPlanFilter::new`]; conjuncts that
/// fail it are dropped (their tags will not appear in observer
/// output). [`build`](Self::build) always succeeds — an empty filter
/// is a valid no-op pruner.
#[derive(Debug)]
pub struct PagePruningAccessPlanFilterBuilder {
schema: SchemaRef,
predicates: Vec<TaggedPagePredicate>,
}

impl PagePruningAccessPlanFilterBuilder {
/// Add a tagged conjunct. Mutates and returns `&mut self`, so this
/// works in both chains and loops.
pub fn push(&mut self, tag: Tag, expr: Arc<dyn PhysicalExpr>) -> &mut Self {
if let Some(p) =
PagePruningAccessPlanFilter::build_one(expr, &self.schema, Some(tag))
{
self.predicates.push(p);
}
self
}

/// Finish building.
pub fn build(self) -> PagePruningAccessPlanFilter {
PagePruningAccessPlanFilter {
predicates: self.predicates,
}
}
}

/// Result of applying page-index pruning to a [`ParquetAccessPlan`].
Expand All @@ -135,44 +187,129 @@ impl PagePruningResult {
}
}

/// The always-together context for a page-index prune. Bundled so the
/// prune entry points don't carry four loose refs.
#[derive(Clone, Copy)]
pub(crate) struct PagePruningContext<'a> {
pub arrow_schema: &'a Schema,
pub parquet_schema: &'a SchemaDescriptor,
pub parquet_metadata: &'a ParquetMetaData,
pub file_metrics: &'a ParquetFileMetrics,
}

/// A configured-but-not-yet-run page-index prune, produced by
/// [`PagePruningAccessPlanFilter::prune_pages`]. Optionally attach an
/// observer with [`with_observer`](Self::with_observer), then execute
/// with [`prune`](Self::prune).
pub(crate) struct PagePrune<'a> {
filter: &'a PagePruningAccessPlanFilter,
access_plan: ParquetAccessPlan,
ctx: &'a PagePruningContext<'a>,
observer: Option<&'a mut dyn PruningObserver>,
}

impl<'a> PagePrune<'a> {
/// Attach a per-leaf observer. Omit to prune without collecting
/// per-conjunct stats (zero overhead).
// Consumed by the adaptive parquet scan (later in the stack); the
// untagged `prune_plan_with_page_index` path never sets an observer.
#[expect(
dead_code,
reason = "tagged page-prune consumer lands later in the stack"
)]
pub fn with_observer(mut self, observer: &'a mut dyn PruningObserver) -> Self {
self.observer = Some(observer);
self
}

/// Run the prune, returning the updated access plan and metrics.
pub fn prune(self) -> PagePruningResult {
let PagePrune {
filter,
access_plan,
ctx,
observer,
} = self;
let mut noop = NoopObserver;
let observer: &mut dyn PruningObserver = match observer {
Some(o) => o,
None => &mut noop,
};
filter.run_prune(access_plan, ctx, observer)
}
}

impl PagePruningAccessPlanFilter {
/// Create a new [`PagePruningAccessPlanFilter`] from a physical
/// expression.
/// expression. Predicates created this way have no caller tag.
#[expect(clippy::needless_pass_by_value)]
pub fn new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Self {
// extract any single column predicates
let predicates = split_conjunction(expr)
.into_iter()
.filter_map(|predicate| {
let pp = match PruningPredicate::try_new(
Arc::clone(predicate),
Arc::clone(&schema),
) {
Ok(pp) => pp,
Err(e) => {
debug!("Ignoring error creating page pruning predicate: {e}");
return None;
}
};

if pp.always_true() {
debug!("Ignoring always true page pruning predicate: {predicate}");
return None;
}

if pp.required_columns().single_column().is_none() {
debug!("Ignoring multi-column page pruning predicate: {predicate}");
return None;
}

Some(pp)
})
.filter_map(|predicate| Self::build_one(Arc::clone(predicate), &schema, None))
.collect::<Vec<_>>();
Self { predicates }
}

/// Start building a filter whose conjuncts each carry a
/// caller-supplied [`Tag`]. Mirrors [`PruningConjunction::builder`]:
///
/// ```ignore
/// let mut b = PagePruningAccessPlanFilter::builder(schema);
/// for (id, expr) in conjuncts {
/// b.push(id, expr);
/// }
/// let filter = b.build();
/// ```
///
/// During pruning the page-index loop fires `observer.on_leaf(tag,
/// ..)` once per leaf actually evaluated; leaves cut off by the
/// AND short-circuit on row selection (`!selects_any`) are not
/// observed.
///
/// [`PruningConjunction::builder`]: datafusion_pruning::PruningConjunction::builder
pub fn builder(schema: SchemaRef) -> PagePruningAccessPlanFilterBuilder {
PagePruningAccessPlanFilterBuilder {
schema,
predicates: Vec::new(),
}
}

fn build_one(
expr: Arc<dyn PhysicalExpr>,
schema: &SchemaRef,
tag: Option<Tag>,
) -> Option<TaggedPagePredicate> {
let pp = match PruningPredicate::try_new(expr, Arc::clone(schema)) {
Ok(pp) => pp,
Err(e) => {
debug!("Ignoring error creating page pruning predicate: {e}");
return None;
}
};
if pp.always_true() {
debug!(
"Ignoring always true page pruning predicate: {}",
pp.orig_expr()
);
return None;
}
if pp.required_columns().single_column().is_none() {
debug!(
"Ignoring multi-column page pruning predicate: {}",
pp.orig_expr()
);
return None;
}
Some(TaggedPagePredicate { tag, predicate: pp })
}

/// Returns an updated [`ParquetAccessPlan`] by applying predicates to the
/// parquet page index, if any
/// parquet page index, if any.
///
/// Thin shim over the fluent `prune_pages` op (no observer).
/// Could be `#[deprecated]` in favor of the fluent form once the
/// adaptive scan migrates.
pub fn prune_plan_with_page_index(
&self,
access_plan: ParquetAccessPlan,
Expand All @@ -181,26 +318,55 @@ impl PagePruningAccessPlanFilter {
parquet_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
) -> ParquetAccessPlan {
self.prune_plan_with_page_index_and_metrics(
access_plan,
let ctx = PagePruningContext {
arrow_schema,
parquet_schema,
parquet_metadata,
file_metrics,
)
.access_plan
};
self.prune_pages(access_plan, &ctx).prune().access_plan
}

/// Begin a page-index prune. Returns a [`PagePrune`] op that can
/// optionally take an observer before running:
///
/// ```ignore
/// let result = filter.prune_pages(access_plan, &ctx)
/// .with_observer(&mut obs) // optional
/// .prune();
/// ```
pub(crate) fn prune_pages<'a>(
&'a self,
access_plan: ParquetAccessPlan,
ctx: &'a PagePruningContext<'a>,
) -> PagePrune<'a> {
PagePrune {
filter: self,
access_plan,
ctx,
observer: None,
}
}

/// Returns an updated [`ParquetAccessPlan`] and metrics by applying predicates
/// to the parquet page index, if any.
pub(crate) fn prune_plan_with_page_index_and_metrics(
/// Workhorse for page-index pruning. The observer fires
/// `on_leaf(tag, mask)` once per predicate that is actually
/// evaluated; `mask[i] = true` means row group `i` still has at
/// least one page that may match that leaf. Predicates skipped by
/// the per-row-group `!selects_any` short-circuit are not observed,
/// so per-conjunct stats are not biased by predicates that never
/// ran (resolves the reviewer's Q2 concern on PR #22235).
fn run_prune(
&self,
mut access_plan: ParquetAccessPlan,
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
parquet_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
ctx: &PagePruningContext<'_>,
observer: &mut dyn PruningObserver,
) -> PagePruningResult {
let PagePruningContext {
arrow_schema,
parquet_schema,
parquet_metadata,
file_metrics,
} = *ctx;
// scoped timer updates on drop
let _timer_guard = file_metrics.page_index_eval_time.timer();
if self.predicates.is_empty() {
Expand All @@ -210,6 +376,16 @@ impl PagePruningAccessPlanFilter {
let page_index_predicates = &self.predicates;
let groups = parquet_metadata.row_groups();

// Per-leaf "did this row group still have any matching pages
// after the leaf alone?" mask. Built across all row groups,
// emitted via `observer.on_leaf` at the end so each leaf gets
// exactly one observation per call to this function. Leaves
// never evaluated (because an earlier conjunct emptied the
// running row selection — `!selects_any` break below) end up
// with `None` here and are intentionally not observed.
let mut per_leaf_mask: Vec<Option<Vec<bool>>> =
(0..page_index_predicates.len()).map(|_| None).collect();

if groups.is_empty() {
return PagePruningResult::new(access_plan, 0);
}
Expand Down Expand Up @@ -262,7 +438,8 @@ impl PagePruningAccessPlanFilter {
let mut matched_pages_in_group: HashSet<usize> =
HashSet::from_iter(0..total_pages_in_group);

for predicate in page_index_predicates {
for (leaf_idx, tagged) in page_index_predicates.iter().enumerate() {
let predicate = &tagged.predicate;
let Some(column) = predicate.required_columns().single_column() else {
debug!(
"Ignoring multi-column page pruning predicate: {:?}",
Expand Down Expand Up @@ -307,6 +484,15 @@ impl PagePruningAccessPlanFilter {
predicate.predicate_expr(),
);

// Per-leaf observation: this leaf ran for `row_group_index`
// and produced `selection`. Whether this row group is
// "kept" by the leaf is `selection.selects_any()`. The
// entry is created lazily so leaves never observed (due
// to short-circuit) stay `None`.
let mask = per_leaf_mask[leaf_idx]
.get_or_insert_with(|| vec![false; groups.len()]);
mask[row_group_index] = selection.selects_any();

let matched_pages_indexes: HashSet<_> = pages
.into_iter()
.enumerate()
Expand Down Expand Up @@ -369,6 +555,18 @@ impl PagePruningAccessPlanFilter {
file_metrics
.page_index_pages_pruned
.add_matched(total_pages_select);

// Emit one observer event per leaf that was actually
// evaluated against at least one row group. Leaves that the
// outer `!selects_any` short-circuit prevented from running
// stay `None` here and are correctly absent from the stats.
for (leaf_idx, mask_opt) in per_leaf_mask.into_iter().enumerate() {
if let Some(mask) = mask_opt {
let tag = page_index_predicates[leaf_idx].tag;
observer.on_leaf(tag, &mask);
}
}

PagePruningResult::new(access_plan, total_pages_skipped_by_fully_matched)
}

Expand Down
Loading
Loading