diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 9a907f4118a8..f9329cf74db6 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -30,6 +30,7 @@ pub mod metadata; mod metrics; mod opener; mod page_filter; +mod push_decoder; mod reader; mod row_filter; mod row_group_filter; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index e41bc82eb75f..11cf786a3d6b 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -19,16 +19,17 @@ use crate::access_plan::PreparedAccessPlan; use crate::page_filter::PagePruningAccessPlanFilter; -use crate::row_filter::{self, ParquetReadPlan, build_projection_read_plan}; +use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState}; +use crate::row_filter::{RowFilterGenerator, build_projection_read_plan}; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, apply_file_schema_type_coercions, coerce_int96_to_resolution, }; -use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::array::RecordBatch; use arrow::datatypes::DataType; use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; -use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; +use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; use std::collections::{HashMap, VecDeque}; @@ -39,12 +40,10 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; +use arrow::datatypes::{SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; -use datafusion_common::{ - ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, -}; +use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, exec_err}; use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; @@ -53,8 +52,8 @@ use datafusion_physical_expr_common::physical_expr::{ }; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, - MetricCategory, PruningMetrics, + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, + PruningMetrics, }; use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; @@ -66,18 +65,14 @@ use futures::{ FutureExt, Stream, StreamExt, future::BoxFuture, ready, stream::BoxStream, }; use log::debug; -use parquet::DecodeResult; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; -use parquet::arrow::arrow_reader::{ - ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelectionPolicy, -}; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::parquet_column; -use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; use parquet::basic::Type; use parquet::bloom_filter::Sbbf; -use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; +use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; /// Stateless Parquet morselizer implementation. /// @@ -108,7 +103,7 @@ pub(super) struct ParquetMorselizer { /// Factory for instantiating parquet reader pub parquet_file_reader_factory: Arc, /// Should the filters be evaluated during the parquet scan using - /// [`DataFusionArrowPredicate`](row_filter::DatafusionArrowPredicate)? + /// [`DatafusionArrowPredicate`](crate::row_filter::DatafusionArrowPredicate)? pub pushdown_filters: bool, /// Should the filters be reordered to optimize the scan? pub reorder_filters: bool, @@ -1158,8 +1153,17 @@ impl RowGroupsPrunedParquetOpen { ); let (decoder, pending_decoders, remaining_limit) = { - let mut row_filter_generator = - RowFilterGenerator::new(&prepared, file_metadata.as_ref()); + let pushdown_predicate = prepared + .pushdown_filters + .then_some(prepared.predicate.as_ref()) + .flatten(); + let mut row_filter_generator = RowFilterGenerator::new( + pushdown_predicate, + &prepared.physical_file_schema, + file_metadata.as_ref(), + prepared.reorder_predicates, + &prepared.file_metrics, + ); // Split into consecutive runs of row groups that share the same filter // requirement. Fully matched row groups skip the RowFilter; others need it. @@ -1227,27 +1231,23 @@ impl RowGroupsPrunedParquetOpen { let output_schema = Arc::clone(&prepared.output_schema); let files_ranges_pruned_statistics = prepared.file_metrics.files_ranges_pruned_statistics.clone(); - let stream = futures::stream::unfold( - PushDecoderStreamState { - decoder, - pending_decoders, - remaining_limit, - reader: prepared.async_file_reader, - projector, - output_schema, - replace_schema, - arrow_reader_metrics, - predicate_cache_inner_records, - predicate_cache_records, - baseline_metrics: prepared.baseline_metrics, - }, - |state| async move { state.transition().await }, - ) - .fuse(); + let stream = PushDecoderStreamState { + decoder, + pending_decoders, + remaining_limit, + reader: prepared.async_file_reader, + projector, + output_schema, + replace_schema, + arrow_reader_metrics, + predicate_cache_inner_records, + predicate_cache_records, + baseline_metrics: prepared.baseline_metrics, + } + .into_stream(); // Wrap the stream so a dynamic filter can stop the file scan early. if let Some(file_pruner) = prepared.file_pruner { - let stream = stream.boxed(); Ok(EarlyStoppingStream::new( stream, file_pruner, @@ -1255,244 +1255,8 @@ impl RowGroupsPrunedParquetOpen { ) .boxed()) } else { - Ok(stream.boxed()) - } - } -} - -/// Builds row filters for decoder runs. -/// -/// A [`RowFilter`] must be owned by a decoder, so scans split across multiple -/// decoder runs need a fresh filter for each run that evaluates row predicates. -struct RowFilterGenerator<'a> { - predicate: Option<&'a Arc>, - physical_file_schema: &'a SchemaRef, - file_metadata: &'a ParquetMetaData, - reorder_predicates: bool, - file_metrics: &'a ParquetFileMetrics, - first_row_filter: Option, -} - -impl<'a> RowFilterGenerator<'a> { - fn new( - prepared: &'a PreparedParquetOpen, - file_metadata: &'a ParquetMetaData, - ) -> Self { - let predicate = prepared - .pushdown_filters - .then_some(prepared.predicate.as_ref()) - .flatten(); - - let mut generator = Self { - predicate, - physical_file_schema: &prepared.physical_file_schema, - file_metadata, - reorder_predicates: prepared.reorder_predicates, - file_metrics: &prepared.file_metrics, - first_row_filter: None, - }; - generator.first_row_filter = generator.build_row_filter(); - generator - } - - fn has_row_filter(&self) -> bool { - self.first_row_filter.is_some() - } - - fn next_filter(&mut self) -> Option { - self.first_row_filter - .take() - .or_else(|| self.build_row_filter()) - } - - fn build_row_filter(&self) -> Option { - let predicate = self.predicate?; - match row_filter::build_row_filter( - predicate, - self.physical_file_schema, - self.file_metadata, - self.reorder_predicates, - self.file_metrics, - ) { - Ok(Some(filter)) => Some(filter), - Ok(None) => None, - Err(e) => { - debug!("Ignoring error building row filter for '{predicate:?}': {e}"); - None - } - } - } -} - -/// State shared while building [`ParquetPushDecoder`]s for one file scan. -/// -/// A scan can be split into multiple decoder runs when row groups have -/// different filtering requirements. This config holds the options that apply -/// to every [`ParquetPushDecoderBuilder`], while each run supplies its own -/// [`PreparedAccessPlan`] and optional row filter. -struct DecoderBuilderConfig<'a> { - read_plan: &'a ParquetReadPlan, - batch_size: usize, - arrow_reader_metrics: &'a ArrowReaderMetrics, - force_filter_selections: bool, - decoder_limit: Option, -} - -impl DecoderBuilderConfig<'_> { - fn build( - &self, - prepared_access_plan: PreparedAccessPlan, - metadata: ArrowReaderMetadata, - ) -> ParquetPushDecoderBuilder { - let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata) - .with_projection(self.read_plan.projection_mask.clone()) - .with_batch_size(self.batch_size) - .with_metrics(self.arrow_reader_metrics.clone()); - if self.force_filter_selections { - builder = builder.with_row_selection_policy(RowSelectionPolicy::Selectors); - } - if let Some(row_selection) = prepared_access_plan.row_selection { - builder = builder.with_row_selection(row_selection); - } - builder = builder.with_row_groups(prepared_access_plan.row_group_indexes); - if let Some(limit) = self.decoder_limit { - builder = builder.with_limit(limit); - } - builder - } -} - -/// State for a stream that decodes a single Parquet file using a push-based decoder. -/// -/// The [`transition`](Self::transition) method drives the decoder in a loop: it requests -/// byte ranges from the [`AsyncFileReader`], pushes the fetched data into the -/// [`ParquetPushDecoder`], and yields projected [`RecordBatch`]es until the file is -/// fully consumed. -struct PushDecoderStreamState { - decoder: ParquetPushDecoder, - /// Additional decoders to process after the current one finishes. - /// Used when fully matched row groups split the scan into consecutive - /// runs with different filter configurations, maintaining original order. - pending_decoders: VecDeque, - /// Global remaining row limit across all decoder runs. - /// - /// Decoder-local limits are only safe for single-run scans. When the scan - /// is split across multiple decoders, the combined stream limit is enforced - /// here instead. - remaining_limit: Option, - reader: Box, - projector: Projector, - output_schema: Arc, - replace_schema: bool, - arrow_reader_metrics: ArrowReaderMetrics, - predicate_cache_inner_records: Gauge, - predicate_cache_records: Gauge, - baseline_metrics: BaselineMetrics, -} - -impl PushDecoderStreamState { - /// Advances the decoder state machine until the next [`RecordBatch`] is - /// produced, the file is fully consumed, or an error occurs. - /// - /// On each iteration the decoder is polled via [`ParquetPushDecoder::try_decode`]: - /// - [`NeedsData`](DecodeResult::NeedsData) – the requested byte ranges are - /// fetched from the [`AsyncFileReader`] and fed back into the decoder. - /// - [`Data`](DecodeResult::Data) – a decoded batch is projected and returned. - /// - [`Finished`](DecodeResult::Finished) – signals end-of-stream (`None`). - /// - /// Takes `self` by value (rather than `&mut self`) so the generated future - /// owns the state directly. This avoids a Stacked Borrows violation under - /// miri where `&mut self` creates a single opaque borrow that conflicts - /// with `unfold`'s ownership across yield points. - async fn transition(mut self) -> Option<(Result, Self)> { - loop { - if self.remaining_limit == Some(0) { - return None; - } - match self.decoder.try_decode() { - Ok(DecodeResult::NeedsData(ranges)) => { - let data = self - .reader - .get_byte_ranges(ranges.clone()) - .await - .map_err(DataFusionError::from); - match data { - Ok(data) => { - if let Err(e) = self.decoder.push_ranges(ranges, data) { - return Some((Err(DataFusionError::from(e)), self)); - } - } - Err(e) => return Some((Err(e), self)), - } - } - Ok(DecodeResult::Data(batch)) => { - let batch = if let Some(remaining_limit) = self.remaining_limit { - if batch.num_rows() > remaining_limit { - self.remaining_limit = Some(0); - batch.slice(0, remaining_limit) - } else { - self.remaining_limit = - Some(remaining_limit - batch.num_rows()); - batch - } - } else { - batch - }; - let mut timer = self.baseline_metrics.elapsed_compute().timer(); - self.copy_arrow_reader_metrics(); - let result = self.project_batch(&batch); - timer.stop(); - // Release the borrow on baseline_metrics before moving self - drop(timer); - return Some((result, self)); - } - Ok(DecodeResult::Finished) => { - // If there are pending decoders (e.g. for consecutive runs - // with different filter configurations), switch to the next. - if let Some(next) = self.pending_decoders.pop_front() { - self.decoder = next; - continue; - } - return None; - } - Err(e) => { - return Some((Err(DataFusionError::from(e)), self)); - } - } - } - } - - /// Copies metrics from ArrowReaderMetrics (the metrics collected by the - /// arrow-rs parquet reader) to the parquet file metrics for DataFusion - fn copy_arrow_reader_metrics(&self) { - if let Some(v) = self.arrow_reader_metrics.records_read_from_inner() { - self.predicate_cache_inner_records.set(v); - } - if let Some(v) = self.arrow_reader_metrics.records_read_from_cache() { - self.predicate_cache_records.set(v); - } - } - - fn project_batch(&self, batch: &RecordBatch) -> Result { - let mut batch = self.projector.project_batch(batch)?; - if self.replace_schema { - // Ensure the output batch has the expected schema. - // This handles things like schema level and field level metadata, which may not be present - // in the physical file schema. - // It is also possible for nullability to differ; some writers create files with - // OPTIONAL fields even when there are no nulls in the data. - // In these cases it may make sense for the logical schema to be `NOT NULL`. - // RecordBatch::try_new_with_options checks that if the schema is NOT NULL - // the array cannot contain nulls, amongst other checks. - let (_stream_schema, arrays, num_rows) = batch.into_parts(); - let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); - batch = RecordBatch::try_new_with_options( - Arc::clone(&self.output_schema), - arrays, - &options, - )?; + Ok(stream) } - Ok(batch) } } diff --git a/datafusion/datasource-parquet/src/push_decoder.rs b/datafusion/datasource-parquet/src/push_decoder.rs new file mode 100644 index 000000000000..8b71be3e8de9 --- /dev/null +++ b/datafusion/datasource-parquet/src/push_decoder.rs @@ -0,0 +1,239 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Push-based Parquet decoder setup and stream driver. +//! +//! This module owns the push-decoder lifecycle: +//! +//! - [`DecoderBuilderConfig`] holds the shared options applied to every +//! [`ParquetPushDecoderBuilder`] in a file scan, exposing a single `build` +//! entry point per decoder run. +//! - [`PushDecoderStreamState`] is the per-file stream driver that polls one +//! or more decoders to completion, yielding projected [`RecordBatch`]es. +//! A scan can produce multiple decoders (for example, when fully matched +//! row groups split it into runs with different filter requirements); the +//! state machine drains them in order so the output is contiguous. +//! +//! The opener constructs both halves and hands the state off to +//! [`PushDecoderStreamState::into_stream`] for consumption. + +use std::collections::VecDeque; +use std::sync::Arc; + +use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::datatypes::Schema; +use futures::StreamExt; +use futures::stream::BoxStream; +use parquet::DecodeResult; +use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelectionPolicy}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; + +use datafusion_common::{DataFusionError, Result}; +use datafusion_physical_expr::projection::Projector; +use datafusion_physical_plan::metrics::{BaselineMetrics, Gauge}; + +use crate::access_plan::PreparedAccessPlan; +use crate::row_filter::ParquetReadPlan; + +/// Shared options applied to every [`ParquetPushDecoderBuilder`] in a file scan. +/// +/// A single scan may produce multiple decoders (for example, when fully matched +/// row groups split the scan into consecutive runs with different filter +/// requirements). All decoders in that scan share the same projection, batch +/// size, metrics sink, and selection policy. +pub(crate) struct DecoderBuilderConfig<'a> { + pub(crate) read_plan: &'a ParquetReadPlan, + pub(crate) batch_size: usize, + pub(crate) arrow_reader_metrics: &'a ArrowReaderMetrics, + pub(crate) force_filter_selections: bool, + pub(crate) decoder_limit: Option, +} + +impl DecoderBuilderConfig<'_> { + /// Build a [`ParquetPushDecoderBuilder`] for a single decoder run. + /// + /// The caller is expected to attach the run-specific + /// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) and predicate + /// cache size on the returned builder. + pub(crate) fn build( + &self, + prepared_access_plan: PreparedAccessPlan, + metadata: ArrowReaderMetadata, + ) -> ParquetPushDecoderBuilder { + let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata) + .with_projection(self.read_plan.projection_mask.clone()) + .with_batch_size(self.batch_size) + .with_metrics(self.arrow_reader_metrics.clone()); + if self.force_filter_selections { + builder = builder.with_row_selection_policy(RowSelectionPolicy::Selectors); + } + if let Some(row_selection) = prepared_access_plan.row_selection { + builder = builder.with_row_selection(row_selection); + } + builder = builder.with_row_groups(prepared_access_plan.row_group_indexes); + if let Some(limit) = self.decoder_limit { + builder = builder.with_limit(limit); + } + builder + } +} + +/// State for a stream that decodes a single Parquet file using a push-based decoder. +/// +/// The [`transition`](Self::transition) method drives the decoder in a loop: it requests +/// byte ranges from the [`AsyncFileReader`], pushes the fetched data into the +/// [`ParquetPushDecoder`], and yields projected [`RecordBatch`]es until the file is +/// fully consumed. +pub(crate) struct PushDecoderStreamState { + pub(crate) decoder: ParquetPushDecoder, + /// Additional decoders to process after the current one finishes. + /// Used when fully matched row groups split the scan into consecutive + /// runs with different filter configurations, maintaining original order. + pub(crate) pending_decoders: VecDeque, + /// Global remaining row limit across all decoder runs. + /// + /// Decoder-local limits are only safe for single-run scans. When the scan + /// is split across multiple decoders, the combined stream limit is enforced + /// here instead. + pub(crate) remaining_limit: Option, + pub(crate) reader: Box, + pub(crate) projector: Projector, + pub(crate) output_schema: Arc, + pub(crate) replace_schema: bool, + pub(crate) arrow_reader_metrics: ArrowReaderMetrics, + pub(crate) predicate_cache_inner_records: Gauge, + pub(crate) predicate_cache_records: Gauge, + pub(crate) baseline_metrics: BaselineMetrics, +} + +impl PushDecoderStreamState { + /// Drive the state machine to completion as a [`futures::Stream`] of record batches. + /// + /// The returned stream is fused and boxed so the caller can wrap it (for + /// example, with an early-stopping adapter) without naming the unfold type. + pub(crate) fn into_stream(self) -> BoxStream<'static, Result> { + futures::stream::unfold(self, |state| async move { state.transition().await }) + .fuse() + .boxed() + } + + /// Advances the decoder state machine until the next [`RecordBatch`] is + /// produced, the file is fully consumed, or an error occurs. + /// + /// On each iteration the decoder is polled via [`ParquetPushDecoder::try_decode`]: + /// - [`NeedsData`](DecodeResult::NeedsData) – the requested byte ranges are + /// fetched from the [`AsyncFileReader`] and fed back into the decoder. + /// - [`Data`](DecodeResult::Data) – a decoded batch is projected and returned. + /// - [`Finished`](DecodeResult::Finished) – signals end-of-stream (`None`). + /// + /// Takes `self` by value (rather than `&mut self`) so the generated future + /// owns the state directly. This avoids a Stacked Borrows violation under + /// miri where `&mut self` creates a single opaque borrow that conflicts + /// with `unfold`'s ownership across yield points. + async fn transition(mut self) -> Option<(Result, Self)> { + loop { + if self.remaining_limit == Some(0) { + return None; + } + match self.decoder.try_decode() { + Ok(DecodeResult::NeedsData(ranges)) => { + let data = self + .reader + .get_byte_ranges(ranges.clone()) + .await + .map_err(DataFusionError::from); + match data { + Ok(data) => { + if let Err(e) = self.decoder.push_ranges(ranges, data) { + return Some((Err(DataFusionError::from(e)), self)); + } + } + Err(e) => return Some((Err(e), self)), + } + } + Ok(DecodeResult::Data(batch)) => { + let batch = if let Some(remaining_limit) = self.remaining_limit { + if batch.num_rows() > remaining_limit { + self.remaining_limit = Some(0); + batch.slice(0, remaining_limit) + } else { + self.remaining_limit = + Some(remaining_limit - batch.num_rows()); + batch + } + } else { + batch + }; + let mut timer = self.baseline_metrics.elapsed_compute().timer(); + self.copy_arrow_reader_metrics(); + let result = self.project_batch(&batch); + timer.stop(); + // Release the borrow on baseline_metrics before moving self + drop(timer); + return Some((result, self)); + } + Ok(DecodeResult::Finished) => { + // If there are pending decoders (e.g. for consecutive runs + // with different filter configurations), switch to the next. + if let Some(next) = self.pending_decoders.pop_front() { + self.decoder = next; + continue; + } + return None; + } + Err(e) => { + return Some((Err(DataFusionError::from(e)), self)); + } + } + } + } + + /// Copies metrics from ArrowReaderMetrics (the metrics collected by the + /// arrow-rs parquet reader) to the parquet file metrics for DataFusion + fn copy_arrow_reader_metrics(&self) { + if let Some(v) = self.arrow_reader_metrics.records_read_from_inner() { + self.predicate_cache_inner_records.set(v); + } + if let Some(v) = self.arrow_reader_metrics.records_read_from_cache() { + self.predicate_cache_records.set(v); + } + } + + fn project_batch(&self, batch: &RecordBatch) -> Result { + let mut batch = self.projector.project_batch(batch)?; + if self.replace_schema { + // Ensure the output batch has the expected schema. + // This handles things like schema level and field level metadata, which may not be present + // in the physical file schema. + // It is also possible for nullability to differ; some writers create files with + // OPTIONAL fields even when there are no nulls in the data. + // In these cases it may make sense for the logical schema to be `NOT NULL`. + // RecordBatch::try_new_with_options checks that if the schema is NOT NULL + // the array cannot contain nulls, amongst other checks. + let (_stream_schema, arrays, num_rows) = batch.into_parts(); + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); + batch = RecordBatch::try_new_with_options( + Arc::clone(&self.output_schema), + arrays, + &options, + )?; + } + Ok(batch) + } +} diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 6dfaa731ae7f..f19dbd6c6fa6 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -1082,6 +1082,70 @@ pub fn build_row_filter( .map(|filters| Some(RowFilter::new(filters))) } +/// Builds row filters for decoder runs. +/// +/// A [`RowFilter`] must be owned by a decoder, so scans split across multiple +/// decoder runs need a fresh filter for each run that evaluates row predicates. +/// The first filter is built eagerly during construction so callers can cheaply +/// query [`has_row_filter`](Self::has_row_filter) before splitting the scan. +pub(crate) struct RowFilterGenerator<'a> { + predicate: Option<&'a Arc>, + physical_file_schema: &'a SchemaRef, + file_metadata: &'a ParquetMetaData, + reorder_predicates: bool, + file_metrics: &'a ParquetFileMetrics, + first_row_filter: Option, +} + +impl<'a> RowFilterGenerator<'a> { + pub(crate) fn new( + predicate: Option<&'a Arc>, + physical_file_schema: &'a SchemaRef, + file_metadata: &'a ParquetMetaData, + reorder_predicates: bool, + file_metrics: &'a ParquetFileMetrics, + ) -> Self { + let mut generator = Self { + predicate, + physical_file_schema, + file_metadata, + reorder_predicates, + file_metrics, + first_row_filter: None, + }; + generator.first_row_filter = generator.build(); + generator + } + + pub(crate) fn has_row_filter(&self) -> bool { + self.first_row_filter.is_some() + } + + pub(crate) fn next_filter(&mut self) -> Option { + self.first_row_filter.take().or_else(|| self.build()) + } + + fn build(&self) -> Option { + let predicate = self.predicate?; + match build_row_filter( + predicate, + self.physical_file_schema, + self.file_metadata, + self.reorder_predicates, + self.file_metrics, + ) { + Ok(Some(filter)) => Some(filter), + Ok(None) => None, + Err(e) => { + log::debug!( + "Ignoring error building row filter for '{predicate:?}': {e}" + ); + None + } + } + } +} + #[cfg(test)] mod test { use super::*;