diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f46bfc62d050e..7f9c51de8a56a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -19,11 +19,11 @@ use crate::access_plan::PreparedAccessPlan; use crate::page_filter::PagePruningAccessPlanFilter; -use crate::row_filter::build_projection_read_plan; +use crate::row_filter::{self, ParquetReadPlan, build_projection_read_plan}; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, - apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, + apply_file_schema_type_coercions, coerce_int96_to_resolution, }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; @@ -76,7 +76,9 @@ use parquet::arrow::parquet_column; use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; use parquet::basic::Type; use parquet::bloom_filter::Sbbf; -use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; +use parquet::file::metadata::{ + PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData, +}; /// Stateless Parquet morselizer implementation. /// @@ -1076,41 +1078,6 @@ impl RowGroupsPrunedParquetOpen { let file_metadata = Arc::clone(reader_metadata.metadata()); let rg_metadata = file_metadata.row_groups(); - // Filter pushdown: evaluate predicates during scan. - // Keep the predicate around so we can rebuild RowFilter per decoder run - // when fully matched row groups split the scan into multiple decoders. - let pushdown_predicate = prepared - .pushdown_filters - .then_some(prepared.predicate.clone()) - .flatten(); - - let try_build_row_filter = - |predicate: &Arc| -> Option { - match row_filter::build_row_filter( - predicate, - &prepared.physical_file_schema, - file_metadata.as_ref(), - prepared.reorder_predicates, - &prepared.file_metrics, - ) { - Ok(Some(filter)) => Some(filter), - Ok(None) => None, - Err(e) => { - debug!( - "Ignoring error building row filter for '{predicate:?}': {e}" - ); - None - } - } - }; - - // Build the first RowFilter eagerly; it will be reused for the first - // filtered decoder run and rebuilt from pushdown_predicate for any - // additional filtered runs. - let mut first_row_filter = - pushdown_predicate.as_ref().and_then(&try_build_row_filter); - let has_row_filter = first_row_filter.is_some(); - // Prune by limit if limit is set and limit order is not sensitive if let (Some(limit), false) = (prepared.limit, prepared.preserve_order) { row_groups.prune_by_limit(limit, rg_metadata, &prepared.file_metrics); @@ -1128,34 +1095,23 @@ impl RowGroupsPrunedParquetOpen { && !access_plan.is_empty() && let Some(page_pruning_predicate) = page_pruning_predicate { - let (page_pruned_access_plan, pages_skipped_by_fully_matched) = - page_pruning_predicate.prune_plan_with_page_index( + let page_pruning_result = page_pruning_predicate + .prune_plan_with_page_index_and_metrics( access_plan, &prepared.physical_file_schema, reader_metadata.parquet_schema(), file_metadata.as_ref(), &prepared.file_metrics, ); - access_plan = page_pruned_access_plan; + access_plan = page_pruning_result.access_plan; ParquetFileMetrics::add_page_index_pages_skipped_by_fully_matched( &prepared.metrics, prepared.partition_index, &prepared.file_name, - pages_skipped_by_fully_matched, + page_pruning_result.pages_skipped_by_fully_matched, ); } - // Prepare access plans (extract row groups and row selection). - let prepare_access_plan = - |plan: ParquetAccessPlan| -> Result { - let mut prepared_access_plan = plan.prepare(rg_metadata)?; - if prepared.reverse_row_groups { - prepared_access_plan = - prepared_access_plan.reverse(file_metadata.as_ref())?; - } - Ok(prepared_access_plan) - }; - let arrow_reader_metrics = ArrowReaderMetrics::enabled(); let read_plan = build_projection_read_plan( prepared.projection.expr_iter(), @@ -1163,69 +1119,60 @@ impl RowGroupsPrunedParquetOpen { reader_metadata.parquet_schema(), ); - // Split into consecutive runs of row groups that share the same filter - // requirement. Fully matched row groups skip the RowFilter; others need it. - // Reverse the run order for reverse scans so the combined decoder stream - // preserves the requested global row group order. - let mut runs = access_plan.split_runs(has_row_filter); - if prepared.reverse_row_groups { - runs.reverse(); - } - let run_count = runs.len(); - let decoder_limit = prepared.limit.filter(|_| run_count == 1); - let remaining_limit = prepared.limit.filter(|_| run_count > 1); - - // Helper: configure a decoder builder with shared options from - // the prepared plan. - let build_decoder = |prepared_access_plan: PreparedAccessPlan, - metadata: ArrowReaderMetadata| - -> Result { - let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata) - .with_projection(read_plan.projection_mask.clone()) - .with_batch_size(prepared.batch_size) - .with_metrics(arrow_reader_metrics.clone()); - if prepared.force_filter_selections { - builder = - builder.with_row_selection_policy(RowSelectionPolicy::Selectors); - } - if let Some(row_selection) = prepared_access_plan.row_selection { - builder = builder.with_row_selection(row_selection); + let (decoder, pending_decoders, remaining_limit) = { + let mut row_filter_generator = + RowFilterGenerator::new(&prepared, file_metadata.as_ref()); + + // Split into consecutive runs of row groups that share the same filter + // requirement. Fully matched row groups skip the RowFilter; others need it. + // Reverse the run order for reverse scans so the combined decoder stream + // preserves the requested global row group order. + let mut runs = access_plan.split_runs(row_filter_generator.has_row_filter()); + if prepared.reverse_row_groups { + runs.reverse(); } - builder = builder.with_row_groups(prepared_access_plan.row_group_indexes); - if let Some(limit) = decoder_limit { - builder = builder.with_limit(limit); - } - Ok(builder) - }; + let run_count = runs.len(); + let decoder_limit = prepared.limit.filter(|_| run_count == 1); + let remaining_limit = prepared.limit.filter(|_| run_count > 1); + + let decoder_config = DecoderBuilderConfig { + read_plan: &read_plan, + batch_size: prepared.batch_size, + arrow_reader_metrics: &arrow_reader_metrics, + force_filter_selections: prepared.force_filter_selections, + decoder_limit, + }; - // Build a decoder per run. - let mut decoders = VecDeque::with_capacity(runs.len()); - for run in runs { - let prepared_access_plan = prepare_access_plan(run.access_plan)?; - let mut builder = - build_decoder(prepared_access_plan, reader_metadata.clone())?; - if run.needs_filter { - // Reuse pre-built filter for the first filtered run, - // rebuild from the predicate for subsequent ones. - let row_filter = first_row_filter.take().or_else(|| { - pushdown_predicate.as_ref().and_then(&try_build_row_filter) - }); - if let Some(row_filter) = row_filter { - builder = builder.with_row_filter(row_filter); - } - if let Some(max_predicate_cache_size) = prepared.max_predicate_cache_size - { - builder = - builder.with_max_predicate_cache_size(max_predicate_cache_size); + // Build a decoder per run. + let mut decoders = VecDeque::with_capacity(runs.len()); + for run in runs { + let prepared_access_plan = prepare_access_plan( + run.access_plan, + rg_metadata, + file_metadata.as_ref(), + prepared.reverse_row_groups, + )?; + let mut builder = + decoder_config.build(prepared_access_plan, reader_metadata.clone()); + if run.needs_filter { + if let Some(row_filter) = row_filter_generator.next_filter() { + builder = builder.with_row_filter(row_filter); + } + if let Some(max_predicate_cache_size) = + prepared.max_predicate_cache_size + { + builder = builder + .with_max_predicate_cache_size(max_predicate_cache_size); + } } + decoders.push_back(builder.build()?); } - decoders.push_back(builder.build()?); - } - let decoder = decoders - .pop_front() - .expect("at least one decoder must be created"); - let pending_decoders = decoders; + let decoder = decoders + .pop_front() + .expect("at least one decoder must be created"); + (decoder, decoders, remaining_limit) + }; let predicate_cache_inner_records = prepared.file_metrics.predicate_cache_inner_records.clone(); @@ -1280,6 +1227,121 @@ impl RowGroupsPrunedParquetOpen { } } +/// Builds row filters for decoder runs. +/// +/// A [`RowFilter`] must be owned by a decoder, so scans split across multiple +/// decoder runs need a fresh filter for each run that evaluates row predicates. +struct RowFilterGenerator<'a> { + predicate: Option<&'a Arc>, + physical_file_schema: &'a SchemaRef, + file_metadata: &'a ParquetMetaData, + reorder_predicates: bool, + file_metrics: &'a ParquetFileMetrics, + first_row_filter: Option, +} + +impl<'a> RowFilterGenerator<'a> { + fn new( + prepared: &'a PreparedParquetOpen, + file_metadata: &'a ParquetMetaData, + ) -> Self { + let predicate = prepared + .pushdown_filters + .then_some(prepared.predicate.as_ref()) + .flatten(); + + let mut generator = Self { + predicate, + physical_file_schema: &prepared.physical_file_schema, + file_metadata, + reorder_predicates: prepared.reorder_predicates, + file_metrics: &prepared.file_metrics, + first_row_filter: None, + }; + generator.first_row_filter = generator.build_row_filter(); + generator + } + + fn has_row_filter(&self) -> bool { + self.first_row_filter.is_some() + } + + fn next_filter(&mut self) -> Option { + self.first_row_filter + .take() + .or_else(|| self.build_row_filter()) + } + + fn build_row_filter(&self) -> Option { + let predicate = self.predicate?; + match row_filter::build_row_filter( + predicate, + self.physical_file_schema, + self.file_metadata, + self.reorder_predicates, + self.file_metrics, + ) { + Ok(Some(filter)) => Some(filter), + Ok(None) => None, + Err(e) => { + debug!("Ignoring error building row filter for '{predicate:?}': {e}"); + None + } + } + } +} + +fn prepare_access_plan( + plan: ParquetAccessPlan, + rg_metadata: &[RowGroupMetaData], + file_metadata: &ParquetMetaData, + reverse_row_groups: bool, +) -> Result { + let mut prepared_access_plan = plan.prepare(rg_metadata)?; + if reverse_row_groups { + prepared_access_plan = prepared_access_plan.reverse(file_metadata)?; + } + Ok(prepared_access_plan) +} + +/// State shared while building [`ParquetPushDecoder`]s for one file scan. +/// +/// A scan can be split into multiple decoder runs when row groups have +/// different filtering requirements. This config holds the options that apply +/// to every [`ParquetPushDecoderBuilder`], while each run supplies its own +/// [`PreparedAccessPlan`] and optional row filter. +struct DecoderBuilderConfig<'a> { + read_plan: &'a ParquetReadPlan, + batch_size: usize, + arrow_reader_metrics: &'a ArrowReaderMetrics, + force_filter_selections: bool, + decoder_limit: Option, +} + +impl DecoderBuilderConfig<'_> { + fn build( + &self, + prepared_access_plan: PreparedAccessPlan, + metadata: ArrowReaderMetadata, + ) -> ParquetPushDecoderBuilder { + let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata) + .with_projection(self.read_plan.projection_mask.clone()) + .with_batch_size(self.batch_size) + .with_metrics(self.arrow_reader_metrics.clone()); + if self.force_filter_selections { + builder = builder.with_row_selection_policy(RowSelectionPolicy::Selectors); + } + if let Some(row_selection) = prepared_access_plan.row_selection { + builder = builder.with_row_selection(row_selection); + } + builder = builder.with_row_groups(prepared_access_plan.row_group_indexes); + if let Some(limit) = self.decoder_limit { + builder = builder.with_limit(limit); + } + builder + } +} + /// State for a stream that decodes a single Parquet file using a push-based decoder. /// /// The [`transition`](Self::transition) method drives the decoder in a loop: it requests diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index fa3e2dd44d9ab..795a63268b6a9 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -115,6 +115,26 @@ pub struct PagePruningAccessPlanFilter { predicates: Vec, } +/// Result of applying page-index pruning to a [`ParquetAccessPlan`]. +pub(crate) struct PagePruningResult { + pub(crate) access_plan: ParquetAccessPlan, + /// Pages skipped because the containing row group was fully matched by + /// row-group statistics. + pub(crate) pages_skipped_by_fully_matched: usize, +} + +impl PagePruningResult { + fn new( + access_plan: ParquetAccessPlan, + pages_skipped_by_fully_matched: usize, + ) -> Self { + Self { + access_plan, + pages_skipped_by_fully_matched, + } + } +} + impl PagePruningAccessPlanFilter { /// Create a new [`PagePruningAccessPlanFilter`] from a physical /// expression. @@ -154,24 +174,44 @@ impl PagePruningAccessPlanFilter { /// Returns an updated [`ParquetAccessPlan`] by applying predicates to the /// parquet page index, if any pub fn prune_plan_with_page_index( + &self, + access_plan: ParquetAccessPlan, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, + parquet_metadata: &ParquetMetaData, + file_metrics: &ParquetFileMetrics, + ) -> ParquetAccessPlan { + self.prune_plan_with_page_index_and_metrics( + access_plan, + arrow_schema, + parquet_schema, + parquet_metadata, + file_metrics, + ) + .access_plan + } + + /// Returns an updated [`ParquetAccessPlan`] and metrics by applying predicates + /// to the parquet page index, if any. + pub(crate) fn prune_plan_with_page_index_and_metrics( &self, mut access_plan: ParquetAccessPlan, arrow_schema: &Schema, parquet_schema: &SchemaDescriptor, parquet_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, - ) -> (ParquetAccessPlan, usize) { + ) -> PagePruningResult { // scoped timer updates on drop let _timer_guard = file_metrics.page_index_eval_time.timer(); if self.predicates.is_empty() { - return (access_plan, 0); + return PagePruningResult::new(access_plan, 0); } let page_index_predicates = &self.predicates; let groups = parquet_metadata.row_groups(); if groups.is_empty() { - return (access_plan, 0); + return PagePruningResult::new(access_plan, 0); } if parquet_metadata.offset_index().is_none() @@ -182,7 +222,7 @@ impl PagePruningAccessPlanFilter { parquet_metadata.offset_index().is_some(), parquet_metadata.column_index().is_some() ); - return (access_plan, 0); + return PagePruningResult::new(access_plan, 0); }; // track the total number of rows that should be skipped @@ -329,7 +369,7 @@ impl PagePruningAccessPlanFilter { file_metrics .page_index_pages_pruned .add_matched(total_pages_select); - (access_plan, total_pages_skipped_by_fully_matched) + PagePruningResult::new(access_plan, total_pages_skipped_by_fully_matched) } /// Returns the number of filters in the [`PagePruningAccessPlanFilter`]