From 0183a120451977f7082d93eb5ea57886eb74abf8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 15 May 2026 14:12:09 -0700 Subject: [PATCH 1/2] feat: add OptionalFilterPhysicalExpr wrapper + proto support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce `OptionalFilterPhysicalExpr`, a transparent `PhysicalExpr` wrapper that marks a filter as *optional* — droppable without affecting query correctness. It delegates every `PhysicalExpr` method to the inner expression, so it is behavior-neutral until a consumer explicitly checks for the marker. This is the foundation for adaptive filter scheduling: a scan can detect the wrapper and drop a performance-hint filter (e.g. a hash-join dynamic filter) when it is not cost-effective, knowing correctness is enforced elsewhere. Also adds proto serialization (`PhysicalOptionalFilterNode`) so physical plans containing the wrapper round-trip faithfully. No caller wraps anything yet — that arrives with the adaptive parquet scan later in the stack. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../physical-expr-common/src/physical_expr.rs | 258 ++++++++++++++++++ datafusion/proto/proto/datafusion.proto | 6 + datafusion/proto/src/generated/pbjson.rs | 105 +++++++ datafusion/proto/src/generated/prost.rs | 9 +- .../proto/src/physical_plan/from_proto.rs | 11 + .../proto/src/physical_plan/to_proto.rs | 12 + 6 files changed, 400 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 212dca6cd57b0..1def37601dd1b 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -724,6 +724,163 @@ pub fn is_volatile(expr: &Arc) -> bool { is_volatile } +/// A transparent wrapper that marks a [`PhysicalExpr`] as *optional* — i.e., +/// droppable without affecting query correctness. +/// +/// This is used for filters that are performance hints (e.g., dynamic join +/// filters) as opposed to mandatory predicates. The selectivity tracker can +/// detect this wrapper via `expr.as_any().downcast_ref::()` +/// and choose to drop the filter entirely when it is not cost-effective. +/// +/// All [`PhysicalExpr`] methods are delegated to the wrapped inner expression. +/// +/// Currently used by `HashJoinExec` for dynamic join filters. When the +/// selectivity tracker drops such a filter, the join still enforces +/// correctness independently — "dropped" simply means the filter is never +/// applied as a scan-time optimization. +#[derive(Debug)] +pub struct OptionalFilterPhysicalExpr { + inner: Arc, +} + +impl OptionalFilterPhysicalExpr { + /// Create a new optional filter wrapping the given expression. + pub fn new(inner: Arc) -> Self { + Self { inner } + } + + /// Returns a clone of the inner (unwrapped) expression. + pub fn inner(&self) -> Arc { + Arc::clone(&self.inner) + } +} + +impl Display for OptionalFilterPhysicalExpr { + /// Pass through to the inner expression. Surfacing the `Optional(..)` + /// wrapper in plan output would require updating dozens of sqllogictest + /// baselines for what is purely a runtime concept (the adaptive + /// scheduler's permission to drop this filter); plan readers don't need + /// to see it. + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.inner) + } +} + +impl PartialEq for OptionalFilterPhysicalExpr { + fn eq(&self, other: &Self) -> bool { + self.inner.as_ref() == other.inner.as_ref() + } +} + +impl Eq for OptionalFilterPhysicalExpr {} + +impl Hash for OptionalFilterPhysicalExpr { + fn hash(&self, state: &mut H) { + self.inner.as_ref().hash(state); + } +} + +impl PhysicalExpr for OptionalFilterPhysicalExpr { + fn data_type(&self, input_schema: &Schema) -> Result { + self.inner.data_type(input_schema) + } + + fn nullable(&self, input_schema: &Schema) -> Result { + self.inner.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + self.inner.evaluate(batch) + } + + fn return_field(&self, input_schema: &Schema) -> Result { + self.inner.return_field(input_schema) + } + + fn evaluate_selection( + &self, + batch: &RecordBatch, + selection: &BooleanArray, + ) -> Result { + self.inner.evaluate_selection(batch, selection) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + assert_eq_or_internal_err!( + children.len(), + 1, + "OptionalFilterPhysicalExpr: expected 1 child" + ); + Ok(Arc::new(OptionalFilterPhysicalExpr::new(Arc::clone( + &children[0], + )))) + } + + fn evaluate_bounds(&self, children: &[&Interval]) -> Result { + self.inner.evaluate_bounds(children) + } + + fn propagate_constraints( + &self, + interval: &Interval, + children: &[&Interval], + ) -> Result>> { + self.inner.propagate_constraints(interval, children) + } + + #[expect(deprecated)] + fn evaluate_statistics(&self, children: &[&Distribution]) -> Result { + self.inner.evaluate_statistics(children) + } + + #[expect(deprecated)] + fn propagate_statistics( + &self, + parent: &Distribution, + children: &[&Distribution], + ) -> Result>> { + self.inner.propagate_statistics(parent, children) + } + + fn get_properties(&self, children: &[ExprProperties]) -> Result { + self.inner.get_properties(children) + } + + fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.inner.fmt_sql(f) + } + + fn snapshot(&self) -> Result>> { + // Always unwrap the Optional wrapper for snapshot consumers (e.g. PruningPredicate). + // If inner has a snapshot, use it; otherwise return the inner directly. + Ok(Some(match self.inner.snapshot()? { + Some(snap) => snap, + None => Arc::clone(&self.inner), + })) + } + + fn snapshot_generation(&self) -> u64 { + // The wrapper itself is not dynamic; tree-walking picks up + // inner's generation via children(). + 0 + } + + fn is_volatile_node(&self) -> bool { + self.inner.is_volatile_node() + } + + fn placement(&self) -> ExpressionPlacement { + self.inner.placement() + } +} + #[cfg(test)] mod test { use crate::physical_expr::PhysicalExpr; @@ -731,6 +888,7 @@ mod test { use arrow::datatypes::{DataType, Schema}; use datafusion_expr_common::columnar_value::ColumnarValue; use std::fmt::{Display, Formatter}; + use std::hash::{Hash, Hasher}; use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Hash)] @@ -905,4 +1063,104 @@ mod test { &BooleanArray::from(vec![true; 5]), ); } + + #[test] + fn test_optional_filter_downcast() { + use super::OptionalFilterPhysicalExpr; + + let inner: Arc = Arc::new(TestExpr {}); + let optional = Arc::new(OptionalFilterPhysicalExpr::new(Arc::clone(&inner))); + + // Can downcast to detect the wrapper + let as_physical: Arc = optional; + assert!( + as_physical + .downcast_ref::() + .is_some() + ); + + // Inner expr is NOT detectable as optional + assert!(inner.downcast_ref::().is_none()); + } + + #[test] + fn test_optional_filter_delegates_evaluate() { + use super::OptionalFilterPhysicalExpr; + + let inner: Arc = Arc::new(TestExpr {}); + let optional = OptionalFilterPhysicalExpr::new(Arc::clone(&inner)); + + let batch = + unsafe { RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 5) }; + let result = optional.evaluate(&batch).unwrap(); + let array = result.to_array(5).unwrap(); + assert_eq!(array.len(), 5); + } + + #[test] + fn test_optional_filter_children_and_with_new_children() { + use super::OptionalFilterPhysicalExpr; + + let inner: Arc = Arc::new(TestExpr {}); + let optional = Arc::new(OptionalFilterPhysicalExpr::new(Arc::clone(&inner))); + + // children() returns the inner + let children = optional.children(); + assert_eq!(children.len(), 1); + + // with_new_children preserves the wrapper + let new_inner: Arc = Arc::new(TestExpr {}); + let rewrapped = Arc::clone(&optional) + .with_new_children(vec![new_inner]) + .unwrap(); + assert!( + rewrapped + .downcast_ref::() + .is_some() + ); + } + + #[test] + fn test_optional_filter_inner() { + use super::OptionalFilterPhysicalExpr; + + let inner: Arc = Arc::new(TestExpr {}); + let optional = OptionalFilterPhysicalExpr::new(Arc::clone(&inner)); + + // inner() returns a clone of the wrapped expression + let unwrapped = optional.inner(); + assert!(unwrapped.downcast_ref::().is_some()); + } + + #[test] + fn test_optional_filter_snapshot_generation_zero() { + use super::OptionalFilterPhysicalExpr; + + let inner: Arc = Arc::new(TestExpr {}); + let optional = OptionalFilterPhysicalExpr::new(inner); + + assert_eq!(optional.snapshot_generation(), 0); + } + + #[test] + fn test_optional_filter_eq_hash() { + use super::OptionalFilterPhysicalExpr; + use std::collections::hash_map::DefaultHasher; + + let inner1: Arc = Arc::new(TestExpr {}); + let inner2: Arc = Arc::new(TestExpr {}); + + let opt1 = OptionalFilterPhysicalExpr::new(inner1); + let opt2 = OptionalFilterPhysicalExpr::new(inner2); + + // Same inner type → equal + assert_eq!(opt1, opt2); + + // Same hash + let mut h1 = DefaultHasher::new(); + let mut h2 = DefaultHasher::new(); + opt1.hash(&mut h1); + opt2.hash(&mut h2); + assert_eq!(h1.finish(), h2.finish()); + } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 865887d41e111..1edf45c550bf2 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -936,6 +936,8 @@ message PhysicalExprNode { PhysicalScalarSubqueryExprNode scalar_subquery = 22; PhysicalDynamicFilterNode dynamic_filter = 23; + + PhysicalOptionalFilterNode optional_filter = 24; } } @@ -947,6 +949,10 @@ message PhysicalDynamicFilterNode { bool is_complete = 5; } +message PhysicalOptionalFilterNode { + PhysicalExprNode inner = 1; +} + message PhysicalScalarUdfNode { string name = 1; repeated PhysicalExprNode args = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b8639afd04a89..3a9c4d27a1321 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16972,6 +16972,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::DynamicFilter(v) => { struct_ser.serialize_field("dynamicFilter", v)?; } + physical_expr_node::ExprType::OptionalFilter(v) => { + struct_ser.serialize_field("optionalFilter", v)?; + } } } struct_ser.end() @@ -17022,6 +17025,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "scalarSubquery", "dynamic_filter", "dynamicFilter", + "optional_filter", + "optionalFilter", ]; #[allow(clippy::enum_variant_names)] @@ -17048,6 +17053,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { HashExpr, ScalarSubquery, DynamicFilter, + OptionalFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -17091,6 +17097,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), "scalarSubquery" | "scalar_subquery" => Ok(GeneratedField::ScalarSubquery), "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), + "optionalFilter" | "optional_filter" => Ok(GeneratedField::OptionalFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -17267,6 +17274,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("dynamicFilter")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DynamicFilter) +; + } + GeneratedField::OptionalFilter => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("optionalFilter")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::OptionalFilter) ; } } @@ -18380,6 +18394,97 @@ impl<'de> serde::Deserialize<'de> for PhysicalNot { deserializer.deserialize_struct("datafusion.PhysicalNot", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalOptionalFilterNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.inner.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalOptionalFilterNode", len)?; + if let Some(v) = self.inner.as_ref() { + struct_ser.serialize_field("inner", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalOptionalFilterNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "inner", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Inner, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "inner" => Ok(GeneratedField::Inner), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalOptionalFilterNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalOptionalFilterNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut inner__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Inner => { + if inner__.is_some() { + return Err(serde::de::Error::duplicate_field("inner")); + } + inner__ = map_.next_value()?; + } + } + } + Ok(PhysicalOptionalFilterNode { + inner: inner__, + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalOptionalFilterNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalPlanNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index b742e82ea24ec..c661872454afb 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1336,7 +1336,7 @@ pub struct PhysicalExprNode { pub expr_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22, 23" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22, 23, 24" )] pub expr_type: ::core::option::Option, } @@ -1393,6 +1393,8 @@ pub mod physical_expr_node { ScalarSubquery(super::PhysicalScalarSubqueryExprNode), #[prost(message, tag = "23")] DynamicFilter(::prost::alloc::boxed::Box), + #[prost(message, tag = "24")] + OptionalFilter(::prost::alloc::boxed::Box), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1409,6 +1411,11 @@ pub struct PhysicalDynamicFilterNode { pub is_complete: bool, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalOptionalFilterNode { + #[prost(message, optional, boxed, tag = "1")] + pub inner: ::core::option::Option<::prost::alloc::boxed::Box>, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 43ebf0474320a..41807491bda79 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -63,6 +63,7 @@ use crate::{convert_required, protobuf}; use datafusion_physical_expr::expressions::{ DynamicFilterInner, DynamicFilterPhysicalExpr, }; +use datafusion_physical_expr_common::physical_expr::OptionalFilterPhysicalExpr; impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { @@ -561,6 +562,16 @@ pub fn parse_physical_expr_with_converter( )); base_filter } + ExprType::OptionalFilter(optional_filter) => { + let inner = parse_required_physical_expr( + optional_filter.inner.as_deref(), + ctx, + "inner", + input_schema, + proto_converter, + )?; + Arc::new(OptionalFilterPhysicalExpr::new(inner)) + } ExprType::Extension(extension) => { let inputs: Vec> = extension .inputs diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 83c11cfc6b299..84df5acec73bb 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -34,6 +34,7 @@ use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; +use datafusion_physical_expr_common::physical_expr::OptionalFilterPhysicalExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, InListExpr, @@ -569,6 +570,17 @@ pub fn serialize_physical_expr_with_converter( }), )), }) + } else if let Some(opt) = expr.downcast_ref::() { + let inner_expr = + Box::new(proto_converter.physical_expr_to_proto(&opt.inner(), codec)?); + Ok(protobuf::PhysicalExprNode { + expr_id, + expr_type: Some(protobuf::physical_expr_node::ExprType::OptionalFilter( + Box::new(protobuf::PhysicalOptionalFilterNode { + inner: Some(inner_expr), + }), + )), + }) } else { let mut buf: Vec = vec![]; match codec.try_encode_expr(value, &mut buf) { From 138c3c6ad82447efd1c13d4b6bd3435254f59850 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 15 May 2026 14:14:46 -0700 Subject: [PATCH 2/2] feat: per-conjunct pruning statistics for PruningPredicate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an opt-in way to learn, per individual conjunct, how effective each predicate was during pruning — without running any extra pruning passes. - `PruningPredicate::try_new_tagged_conjuncts` builds a predicate from AND-conjuncts, each carrying a caller-supplied tag. - `PruningPredicate::prune_per_conjunct` returns the usual prune mask plus per-conjunct `PerConjunctPruneStats` (rows/containers seen vs. skipped) as a side effect of the pruning iteration that already runs. - `RowGroupAccessPlanFilter::prune_by_statistics_with_per_conjunct_stats` and `PagePruningAccessPlanFilter::prune_plan_with_per_conjunct_stats` surface those stats for row-group and page-index pruning respectively. The existing untagged `prune` / `prune_by_statistics` / `prune_plan_with_page_index` paths are preserved and unchanged; the new methods return empty stats on the untagged path. No in-tree caller uses the tagged path yet — the adaptive parquet scan consumes it later in the stack as a selectivity prior. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../datasource-parquet/src/page_filter.rs | 260 +++++++++++++++++- .../src/row_group_filter.rs | 45 ++- datafusion/pruning/src/lib.rs | 4 +- datafusion/pruning/src/pruning_predicate.rs | 209 +++++++++++++- 4 files changed, 500 insertions(+), 18 deletions(-) diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index fa3e2dd44d9ab..076fffc248718 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -113,6 +113,37 @@ pub struct PagePruningAccessPlanFilter { /// single column predicates (e.g. (`col = 5`) extracted from the overall /// predicate. Must all be true for a row to be included in the result. predicates: Vec, + /// Per-predicate tag (caller-supplied id, typically a `FilterId`). + /// `None` when the filter was constructed without tagging via + /// [`Self::new`]; `Some` when constructed via [`Self::new_tagged`]. + /// The vector has the same length as `predicates`. + tags: Option>, +} + +/// Per-conjunct accumulators surfaced by +/// [`PagePruningAccessPlanFilter::prune_plan_with_per_conjunct_stats`]. +/// One entry per kept predicate (in the same order as `predicates`). +#[derive(Clone, Debug, Default)] +pub struct PerConjunctPageStats { + /// Caller tag (e.g. FilterId) — `None` when the filter was built + /// untagged via [`PagePruningAccessPlanFilter::new`]. + pub tag: Option, + /// Total rows in row groups where this conjunct was evaluated. + pub rows_seen: u64, + /// Rows the page index proved this conjunct alone would skip. + pub rows_skipped: u64, +} + +impl PerConjunctPageStats { + /// Returns the per-conjunct page-pruning rate, or `None` when no + /// rows were evaluated (e.g. the file has no page index for this + /// column, or the predicate's converter couldn't be built). + pub fn pruning_rate(&self) -> Option { + if self.rows_seen == 0 { + return None; + } + Some(self.rows_skipped as f64 / self.rows_seen as f64) + } } impl PagePruningAccessPlanFilter { @@ -148,7 +179,50 @@ impl PagePruningAccessPlanFilter { Some(pp) }) .collect::>(); - Self { predicates } + Self { + predicates, + tags: None, + } + } + + /// Variant of [`Self::new`] that takes already-split conjuncts each + /// carrying a caller tag (usually a `FilterId`). Predicates that + /// fail the same single-column / non-trivial filtering as `new` + /// are dropped, but tags survive for the conjuncts that make it + /// through. Subsequent calls to + /// [`Self::prune_plan_with_per_conjunct_stats`] return per-conjunct + /// pruning stats keyed by tag. + pub fn new_tagged( + conjuncts: &[(usize, Arc)], + schema: &SchemaRef, + ) -> Self { + let mut predicates = Vec::with_capacity(conjuncts.len()); + let mut tags = Vec::with_capacity(conjuncts.len()); + for (id, expr) in conjuncts { + let pp = match PruningPredicate::try_new(Arc::clone(expr), Arc::clone(schema)) + { + Ok(pp) => pp, + Err(e) => { + debug!( + "Ignoring error creating tagged page pruning predicate \ + for filter id {id}: {e}" + ); + continue; + } + }; + if pp.always_true() { + continue; + } + if pp.required_columns().single_column().is_none() { + continue; + } + predicates.push(pp); + tags.push(*id); + } + Self { + predicates, + tags: Some(tags), + } } /// Returns an updated [`ParquetAccessPlan`] by applying predicates to the @@ -336,6 +410,190 @@ impl PagePruningAccessPlanFilter { pub fn filter_number(&self) -> usize { self.predicates.len() } + + /// Like [`Self::prune_plan_with_page_index`] but also surfaces, as a + /// side-effect of the pruning iteration that already runs, a + /// per-conjunct accumulator with the rows that conjunct alone + /// would have proven skippable. Callers use this to seed a + /// per-FilterId selectivity prior without doing any extra pruning + /// work — every page-index lookup that would have happened in + /// `prune_plan_with_page_index` happens exactly once here too. + pub fn prune_plan_with_per_conjunct_stats( + &self, + mut access_plan: ParquetAccessPlan, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, + parquet_metadata: &ParquetMetaData, + file_metrics: &ParquetFileMetrics, + ) -> (ParquetAccessPlan, Vec, usize) { + // scoped timer updates on drop + let _timer_guard = file_metrics.page_index_eval_time.timer(); + + let mut per_conjunct: Vec = (0..self.predicates.len()) + .map(|i| PerConjunctPageStats { + tag: self.tags.as_ref().and_then(|t| t.get(i).copied()), + rows_seen: 0, + rows_skipped: 0, + }) + .collect(); + + if self.predicates.is_empty() { + return (access_plan, per_conjunct, 0); + } + + let groups = parquet_metadata.row_groups(); + if groups.is_empty() { + return (access_plan, per_conjunct, 0); + } + + if parquet_metadata.offset_index().is_none() + || parquet_metadata.column_index().is_none() + { + return (access_plan, per_conjunct, 0); + } + + // Same accumulators as the untagged path, plus per-conjunct. + let mut total_skip = 0; + let mut total_select = 0; + let mut total_pages_skip = 0; + let mut total_pages_select = 0; + // Pages we skipped pruning for because row-group stats already + // proved the row group is fully matched — wasted work avoided, + // surfaced as a metric. + let mut total_pages_skipped_by_fully_matched = 0; + + let row_group_indexes = access_plan.row_group_indexes(); + for row_group_index in row_group_indexes { + // Skip page pruning for fully matched row groups: all rows are + // known to satisfy the predicate, so page-level pruning is + // wasted work. Still feed the rows into `rows_seen` per + // conjunct so per-FilterId pruning rates reflect the file's + // full row count rather than just the non-fully-matched part. + if access_plan.is_fully_matched(row_group_index) { + let page_count = + fully_matched_page_count(row_group_index, parquet_metadata); + total_pages_skipped_by_fully_matched += page_count; + let rg_rows = groups[row_group_index].num_rows() as u64; + for stats in per_conjunct.iter_mut() { + stats.rows_seen = stats.rows_seen.saturating_add(rg_rows); + } + continue; + } + let rg_rows = groups[row_group_index].num_rows() as u64; + let mut overall_selection = None; + + let total_pages_in_group = + parquet_metadata.offset_index().map_or(0, |offset_index| { + offset_index[row_group_index] + .first() + .map_or(0, |column| column.page_locations.len()) + }); + // Intersection of per-conjunct matched pages, matching the + // untagged path's behavior so the page-level metric reflects + // the AND of all predicates rather than a per-conjunct sum. + let mut matched_pages_in_group: HashSet = + HashSet::from_iter(0..total_pages_in_group); + + for (i, predicate) in self.predicates.iter().enumerate() { + per_conjunct[i].rows_seen = + per_conjunct[i].rows_seen.saturating_add(rg_rows); + + let column = predicate + .required_columns() + .single_column() + .expect("Page pruning requires single column predicates"); + + let converter = match StatisticsConverter::try_new( + column.name(), + arrow_schema, + parquet_schema, + ) { + Ok(c) => c, + Err(e) => { + debug!( + "Could not create statistics converter for column {}: {e}", + column.name() + ); + continue; + } + }; + + let selection = prune_pages_in_one_row_group( + row_group_index, + predicate, + converter, + parquet_metadata, + file_metrics, + ); + + let Some((selection, page_match_flags)) = selection else { + continue; + }; + let matched_pages_indexes: HashSet<_> = page_match_flags + .into_iter() + .enumerate() + .filter(|x| x.1) + .map(|x| x.0) + .collect(); + matched_pages_in_group.retain(|x| matched_pages_indexes.contains(x)); + + // Per-conjunct skipped rows for this row group: anything + // the predicate's selection didn't include is something + // this conjunct alone proved skippable. + let kept_rows_for_conjunct = selection.row_count() as u64; + let skipped_rows_for_conjunct = + rg_rows.saturating_sub(kept_rows_for_conjunct); + per_conjunct[i].rows_skipped = per_conjunct[i] + .rows_skipped + .saturating_add(skipped_rows_for_conjunct); + + overall_selection = update_selection(overall_selection, selection); + + let selects_any = overall_selection + .as_ref() + .map(|sel| sel.selects_any()) + .unwrap_or(true); + if !selects_any { + break; + } + } + + let pages_matched = matched_pages_in_group.len(); + total_pages_select += pages_matched; + total_pages_skip += total_pages_in_group - pages_matched; + + if let Some(overall_selection) = overall_selection { + let rows_selected = overall_selection.row_count(); + if rows_selected > 0 { + let rows_skipped = overall_selection.skipped_row_count(); + total_skip += rows_skipped; + total_select += rows_selected; + access_plan.scan_selection(row_group_index, overall_selection); + } else { + let rows_skipped = groups[row_group_index].num_rows() as usize; + access_plan.skip(row_group_index); + total_skip += rows_skipped; + } + } + } + + file_metrics.page_index_rows_pruned.add_pruned(total_skip); + file_metrics + .page_index_rows_pruned + .add_matched(total_select); + file_metrics + .page_index_pages_pruned + .add_pruned(total_pages_skip); + file_metrics + .page_index_pages_pruned + .add_matched(total_pages_select); + + ( + access_plan, + per_conjunct, + total_pages_skipped_by_fully_matched, + ) + } } fn update_selection( diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index c45e69600f70c..b4aa07c9f2378 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -253,6 +253,28 @@ impl RowGroupAccessPlanFilter { predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) { + self.prune_by_statistics_with_per_conjunct_stats( + arrow_schema, + parquet_schema, + groups, + predicate, + metrics, + ); + } + + /// Variant of [`Self::prune_by_statistics`] that also returns + /// per-conjunct pruning stats produced by + /// [`PruningPredicate::prune_per_conjunct`]. Returns an empty + /// `Vec` when the predicate was not constructed with tagged + /// conjuncts, so callers can ignore it on the untagged path. + pub fn prune_by_statistics_with_per_conjunct_stats( + &mut self, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, + groups: &[RowGroupMetaData], + predicate: &PruningPredicate, + metrics: &ParquetFileMetrics, + ) -> Vec { // scoped timer updates on drop let _timer_guard = metrics.statistics_eval_time.timer(); @@ -275,9 +297,14 @@ impl RowGroupAccessPlanFilter { missing_null_counts_as_zero: true, }; - // try to prune the row groups in a single call - match predicate.prune(&pruning_stats) { - Ok(values) => { + let mut per_conjunct: Vec = Vec::new(); + + // try to prune the row groups in a single call (now also captures + // per-conjunct rates when the predicate was built with + // `try_new_tagged_conjuncts`). + match predicate.prune_per_conjunct(&pruning_stats) { + Ok((values, stats)) => { + per_conjunct = stats; let mut fully_contained_candidates_original_idx: Vec = Vec::new(); for (idx, &value) in row_group_indexes.iter().zip(values.iter()) { if !value { @@ -305,6 +332,8 @@ impl RowGroupAccessPlanFilter { metrics.predicate_evaluation_errors.add(1); } } + + per_conjunct } /// Identifies row groups that are fully matched by the predicate. @@ -607,11 +636,11 @@ impl PruningStatistics for BloomFilterStatistics { } /// Wraps a slice of [`RowGroupMetaData`] in a way that implements [`PruningStatistics`] -struct RowGroupPruningStatistics<'a> { - parquet_schema: &'a SchemaDescriptor, - row_group_metadatas: Vec<&'a RowGroupMetaData>, - arrow_schema: &'a Schema, - missing_null_counts_as_zero: bool, +pub(crate) struct RowGroupPruningStatistics<'a> { + pub(crate) parquet_schema: &'a SchemaDescriptor, + pub(crate) row_group_metadatas: Vec<&'a RowGroupMetaData>, + pub(crate) arrow_schema: &'a Schema, + pub(crate) missing_null_counts_as_zero: bool, } impl<'a> RowGroupPruningStatistics<'a> { diff --git a/datafusion/pruning/src/lib.rs b/datafusion/pruning/src/lib.rs index be17f29eaafa0..334aed77a7b97 100644 --- a/datafusion/pruning/src/lib.rs +++ b/datafusion/pruning/src/lib.rs @@ -22,6 +22,6 @@ mod pruning_predicate; pub use file_pruner::FilePruner; pub use pruning_predicate::{ - PredicateRewriter, PruningPredicate, PruningStatistics, RequiredColumns, - UnhandledPredicateHook, build_pruning_predicate, + PerConjunctPruneStats, PredicateRewriter, PruningPredicate, PruningStatistics, + RequiredColumns, UnhandledPredicateHook, build_pruning_predicate, }; diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 013a06812a13c..986c34e5d21a8 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -375,6 +375,51 @@ pub struct PruningPredicate { /// /// See [`PruningPredicate::literal_guarantees`] for more details. literal_guarantees: Vec, + /// Optional per-conjunct sub-predicates, populated when the + /// constructor splits a top-level AND into separate + /// `PruningPredicate`s. When present, [`Self::prune_per_conjunct`] + /// evaluates each sub-predicate to produce per-conjunct pruning + /// rates; ordinary [`Self::prune`] is unchanged. + /// + /// Only the leaves are populated — the sub-predicates themselves + /// have `sub_predicates: None`. + sub_predicates: Option>, +} + +/// A per-conjunct sub-predicate paired with an optional caller tag +/// (typically a `FilterId` chosen by the caller). Lives behind +/// [`PruningPredicate::sub_predicates`]. +#[derive(Debug, Clone)] +struct TaggedSubPredicate { + /// Caller tag (`None` when constructed without tagging). + tag: Option, + /// The per-conjunct PruningPredicate. + predicate: PruningPredicate, +} + +/// Result of [`PruningPredicate::prune_per_conjunct`] for one +/// sub-predicate. +#[derive(Debug, Clone, Default)] +pub struct PerConjunctPruneStats { + /// Caller tag (e.g. `FilterId`), `None` when constructed via + /// [`PruningPredicate::try_new`] without tagging. + pub tag: Option, + /// Number of containers (row groups) the sub-predicate was + /// evaluated against. + pub containers_seen: usize, + /// Number of containers this sub-predicate alone would prune. + pub containers_pruned: usize, +} + +impl PerConjunctPruneStats { + /// Pruning rate for this conjunct, or `None` when no containers + /// were evaluated. + pub fn pruning_rate(&self) -> Option { + if self.containers_seen == 0 { + return None; + } + Some(self.containers_pruned as f64 / self.containers_seen as f64) + } } /// Build a pruning predicate from an optional predicate expression. @@ -499,9 +544,77 @@ impl PruningPredicate { required_columns, orig_expr: expr, literal_guarantees, + sub_predicates: None, }) } + /// Like [`Self::try_new`] but takes already-split top-level + /// conjuncts each carrying a caller tag (typically a `FilterId`). + /// Builds one [`PruningPredicate`] per conjunct as a leaf + /// sub-predicate. The wrapper itself is a marker holding the + /// leaves; calls to [`Self::prune`] AND the leaves' results, + /// avoiding a redundant combined-predicate construction. + /// [`Self::prune_per_conjunct`] also reads from the same leaves. + /// + /// Conjuncts whose individual `try_new` would error or return + /// always-true are silently skipped (their tags do not appear in + /// the per-conjunct output). + pub fn try_new_tagged_conjuncts( + tagged: &[(usize, Arc)], + schema: SchemaRef, + ) -> Result { + // Build per-conjunct PruningPredicates (each is a leaf — i.e. + // its own `sub_predicates` is `None`). + let mut sub_predicates: Vec = Vec::new(); + for (tag, expr) in tagged { + match Self::try_new(Arc::clone(expr), Arc::clone(&schema)) { + Ok(p) if !p.always_true() => { + sub_predicates.push(TaggedSubPredicate { + tag: Some(*tag), + predicate: p, + }); + } + Ok(_) => { + // always-true: skip; leaves the tag unrepresented. + continue; + } + Err(e) => { + debug!("try_new_tagged_conjuncts: skipping conjunct {tag}: {e}"); + continue; + } + } + } + + // Build a marker wrapper. Its own `predicate_expr` is a + // literal `true` placeholder; `prune` is special-cased below + // to AND the leaves' results when `sub_predicates` is set. + let placeholder_expr: Arc = + Arc::new(phys_expr::Literal::new(ScalarValue::from(true))); + let combined_orig: Arc = if tagged.is_empty() { + Arc::clone(&placeholder_expr) + } else { + datafusion_physical_expr::conjunction( + tagged + .iter() + .map(|(_, e)| Arc::clone(e)) + .collect::>(), + ) + }; + let wrapper = Self { + schema, + predicate_expr: placeholder_expr, + required_columns: RequiredColumns::new(), + orig_expr: combined_orig, + literal_guarantees: Vec::new(), + sub_predicates: if sub_predicates.is_empty() { + None + } else { + Some(sub_predicates) + }, + }; + Ok(wrapper) + } + /// For each set of statistics, evaluates the pruning predicate /// and returns a `bool` with the following meaning for a /// all rows whose values match the statistics: @@ -520,6 +633,22 @@ impl PruningPredicate { &self, statistics: &S, ) -> Result> { + // If we're a tagged-conjunct wrapper (no own predicate_expr, + // just leaf sub_predicates), AND the leaves' results. + if let Some(sub_predicates) = &self.sub_predicates { + let n = statistics.num_containers(); + let mut combined = vec![true; n]; + for sub in sub_predicates { + let leaf = sub.predicate.prune(statistics)?; + for (i, val) in leaf.iter().enumerate() { + if i < combined.len() { + combined[i] = combined[i] && *val; + } + } + } + return Ok(combined); + } + let mut builder = BoolVecBuilder::new(statistics.num_containers()); // Try to prove the predicate can't be true for the containers based on @@ -568,6 +697,50 @@ impl PruningPredicate { Ok(builder.build()) } + /// Like [`Self::prune`] but also returns per-conjunct pruning + /// stats when this predicate was constructed via + /// [`Self::try_new_tagged_conjuncts`]. The `Vec` is the same + /// AND'd result `prune` would return; the per-conjunct stats are + /// computed by evaluating each leaf sub-predicate against the same + /// `statistics` and counting pruned/seen containers. + /// + /// Returns `(prune_result, vec![])` for predicates constructed via + /// the plain [`Self::try_new`] (no sub-predicates available). + pub fn prune_per_conjunct( + &self, + statistics: &S, + ) -> Result<(Vec, Vec)> { + let combined = self.prune(statistics)?; + let Some(sub_predicates) = &self.sub_predicates else { + return Ok((combined, Vec::new())); + }; + + let total_containers = statistics.num_containers(); + let mut per_conjunct: Vec = + Vec::with_capacity(sub_predicates.len()); + for sub in sub_predicates { + let kept = sub.predicate.prune(statistics)?; + let containers_seen = kept.len(); + let containers_pruned = containers_seen - kept.iter().filter(|b| **b).count(); + // Sanity: every sub-predicate evaluates against the same + // statistics shape, so `kept.len() == total_containers`. + // If the implementation drift breaks that, fall back to + // skipping this conjunct rather than panicking. + if containers_seen != total_containers { + debug!( + "prune_per_conjunct: sub-predicate seen={containers_seen} expected={total_containers}, skipping conjunct" + ); + continue; + } + per_conjunct.push(PerConjunctPruneStats { + tag: sub.tag, + containers_seen, + containers_pruned, + }); + } + Ok((combined, per_conjunct)) + } + /// Return a reference to the input schema pub fn schema(&self) -> &SchemaRef { &self.schema @@ -599,6 +772,13 @@ impl PruningPredicate { /// /// This can happen when a predicate is simplified to a constant `true` pub fn always_true(&self) -> bool { + // A tagged-conjunct wrapper is never always-true unless every + // leaf is (which can't happen — always-true leaves are dropped + // at construction). So when sub_predicates is Some and + // non-empty, return false. + if let Some(subs) = &self.sub_predicates { + return subs.is_empty(); + } is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty() } @@ -614,14 +794,29 @@ impl PruningPredicate { /// used in the predicate. For example, it can be used to avoid reading /// unneeded bloom filters (a non trivial operation). pub fn literal_columns(&self) -> Vec { + // For tagged-conjunct wrappers, union the leaves' columns — + // the wrapper's own `literal_guarantees` is empty (its + // `predicate_expr` is a literal-true placeholder) but + // downstream consumers (e.g. `ParquetOpener` deciding which + // bloom filters to fetch) need the full set. let mut seen = HashSet::new(); - self.literal_guarantees - .iter() - .map(|e| &e.column.name) - // avoid duplicates - .filter(|name| seen.insert(*name)) - .map(|s| s.to_string()) - .collect() + let mut out: Vec = Vec::new(); + if let Some(subs) = &self.sub_predicates { + for sub in subs { + for name in sub.predicate.literal_columns() { + if seen.insert(name.clone()) { + out.push(name); + } + } + } + } + for e in &self.literal_guarantees { + let name = &e.column.name; + if seen.insert(name.to_string()) { + out.push(name.to_string()); + } + } + out } }