[POC for #9934] parquet: add ReverseSerializedPageReader#9937
[POC for #9934] parquet: add ReverseSerializedPageReader#9937zhuqi-lucas wants to merge 7 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds a proof-of-concept Parquet page reader that iterates page traversal order in reverse (dictionary first, then data pages from last to first) using OffsetIndex, to enable reverse streaming primitives discussed in #9934.
Changes:
- Introduces
ReverseSerializedPageReader<R: ChunkReader>implementingPageReader + Iterator<Item = Result<Page>> - Wires the new module into
parquet::fileand adds an extensive test suite validating forward/reverse equivalence at page- and value-level
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| parquet/src/file/reverse_serialized_reader.rs | Implements the reverse page-order reader and adds targeted tests for correctness and state-machine behavior |
| parquet/src/file/mod.rs | Exposes the new module via pub mod reverse_serialized_reader; |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
alamb
left a comment
There was a problem hiding this comment.
THanks @zhuqi-lucas -- I can see why reversing just the row groups still leaves additional potential improvments
I am worried that this type of code will add lots of complexity into the parquet reader (baasically now we need to maintain two code patsh -- the forward reader and the reverse reader)
Is there any way to reuse the existing page reader, but add some sort of abtraction that defines what pages are read in what order? At the moment it just blasts down the PageIndex in order -- but maybe we could provide an alternate OffsetIdex or something that was reversed and use the same reader path 🤔
Introduces a new `PageReader` that emits pages from a column chunk in reverse page order using `OffsetIndex.page_locations`. The dictionary page (if any) is emitted first; data pages are then emitted from the last `PageLocation` to the first. Pages are still decoded in their native forward direction — only traversal order is reversed. This is a building block for the reverse-streaming use case described in apache#9934, in particular `WHERE filter ORDER BY DESC LIMIT N` queries where `RowSelection` cannot pre-select the matching rows. Limitations of this Phase 1 POC: - Encryption is not supported. - `peek_next_page` does not populate `num_rows`. - Requires a column chunk with `OffsetIndex` (page index). Tests verify byte-level equivalence between forward and reverse traversal across uncompressed, dictionary-enabled, and Snappy-compressed column chunks.
Adds 17 additional test cases (20 total) covering: - Page buffer/metadata equivalence: - UNCOMPRESSED, SNAPPY, GZIP, ZSTD compression codecs - With and without dictionary encoding - DataPage V1 and V2 (PARQUET_2_0 writer version) - Value-level decode equivalence (using `ColumnReaderImpl`): - INT32, INT64, BYTE_ARRAY physical types - With and without dictionary encoding - DataPage V1 and V2 - NULL handling: - OPTIONAL columns with ~30-33% nulls - Both DataPage V1 and V2 - State-machine API surface: - `peek_next_page` reports `is_dict` correctly across the full lifecycle - `skip_next_page` advances state with and without a dictionary page - Edge cases: - Single-data-page column chunks - 50k-row stress test (>20 pages) - `Iterator` trait impl Also fixes a `skip_next_page` semantics bug where the `NeedDict` -> `Data` transition without a real dictionary did not consume any data page, diverging from `SerializedPageReader::skip_next_page` behavior.
…ader - Move `page_locations` and `dictionary_page` onto the struct; reduce `ReverseState` to a tag + cursor. Eliminates `mem::replace` churn and makes the state machine easier to reason about. - Read pages *before* committing the state transition, so a transient I/O error no longer silently advances past a page. - `peek_next_page` now returns `None` when the chunk has neither a dictionary page nor any data pages, matching `get_next_page`. - Include `compressed_page_size` and `offset` in the malformed-size error to make diagnosing bad offset indexes easier. - Mark the type `#[doc(hidden)]` and document API stability — this surface is experimental until the design from apache#9934 is settled. - Add `peek_returns_none_for_empty_chunk` regression test.
Phase 1's empirical claim is per-page cost parity with the existing forward `SerializedPageReader`. This benchmark verifies that claim by draining the same column chunk in both directions and measuring first-page latency. Both UNCOMPRESSED and SNAPPY codecs are covered. Sample numbers on Apple M-series (--quick mode, 100k INT32 values, ~98 data pages, no dictionary): uncompressed/forward_drain 9.73 µs uncompressed/reverse_drain 9.34 µs uncompressed/forward_first_page 164 ns uncompressed/reverse_first_page 156 ns snappy/forward_drain 21.5 µs snappy/reverse_drain 21.2 µs snappy/forward_first_page 313 ns snappy/reverse_first_page 283 ns Forward and reverse readers are within 1-5% across all metrics, well inside measurement noise. Phase 2's value-add (early termination, low peak memory under filter+reverse) is not exercised here — that requires record-batch-level integration and will be benchmarked separately.
Adds a benchmark group that simulates the existing row-group-level reverse strategy (apache/datafusion#18817 — forward-decode the entire chunk, reverse the value buffer, take N) versus the page-level reverse strategy enabled by `ReverseSerializedPageReader` (decode just enough pages from the back to gather N values, reverse those, take N). This is the closest in-crate proxy for the value-add Phase 2/3 will deliver to query engines: when a caller only needs the last N rows of a sorted column chunk, page-level reverse avoids decoding all 98 pages of the chunk. Numbers on Apple M-series (--quick mode, 100k INT32 values, 98 pages, no dictionary, uncompressed): n=10: row_group_sim 28.8 µs vs page_reverse 564 ns ~51x n=100: row_group_sim 29.0 µs vs page_reverse 602 ns ~48x n=1024: row_group_sim 28.9 µs vs page_reverse 505 ns ~57x Row-group time is roughly constant (always full chunk decode); page-reverse time stays low while N <= last page's row count, then grows by one page-decode per additional page consumed.
The existing `time_to_first_n_page_reverse` does a single trailing
reverse on the accumulated value buffer. That is correct only when
`n <= PAGE_ROW_COUNT_LIMIT` (one page worth) — for larger `n` the
buffer holds concatenated forward-decoded pages in *emission* order
(`page_N forward, page_{N-1} forward, ...`) and one trailing reverse
would scramble the cross-page boundaries.
Adds `time_to_first_n_page_reverse_cross_page`, which reads one page
worth at a time and reverses *only that segment* before continuing.
This is correct for any `n` and is the algorithmic shape Phase 2
(RecordBatchReader integration) will use: per-page reverse + emit, no
cross-page accumulation.
Bench now exercises both single-page (n = 10 / 100 / 1024) and
cross-page (n = 2k / 10k / 50k) paths. Numbers on Apple M-series
(--quick, 100k INT32, 98 pages, no dict, uncompressed):
n=10 row_group_sim 26.7µs page_reverse 565ns ~47x
n=100 row_group_sim 26.7µs page_reverse 565ns ~47x
n=1024 row_group_sim 26.7µs page_reverse 472ns ~57x
n=2000 row_group_sim 26.8µs page_reverse_cross 770ns ~35x
n=10000 row_group_sim 26.6µs page_reverse_cross 2.85µs ~9.3x
n=50000 row_group_sim 26.7µs page_reverse_cross 14.5µs ~1.8x
Speedup follows total_pages / pages_needed_for_n and converges to ~1x
as n approaches the full chunk — the expected shape for a primitive
that saves work proportional to the unread tail.
…s` flag Per @alamb's review (apache#9937 review-4255127096): the standalone `ReverseSerializedPageReader` was ~95% duplicate of `SerializedPageReader::Pages` mode — the only structural difference was `pop_back` vs `pop_front`. Maintaining two near-identical readers adds ongoing complexity without any benefit visible to downstream callers. This collapses the two paths into one: - `SerializedPageReaderState::Pages` gains a `reverse: bool` field - new builder method `SerializedPageReader::with_reverse_pages(true)` flips the flag (and seeds `page_index` to `len - 1` so the page ordinal stays correct, including for encryption AAD) - `get_next_page` / `peek_next_page` / `skip_next_page` / `peek_next_page_offset` switch `pop_front`/`front` to `pop_back`/`back` and `+= 1` to `saturating_sub(1)` under the flag - the dictionary page is still emitted first in both modes (data pages depend on the loaded dictionary) The dedicated `reverse_serialized_reader.rs` module is removed (~1080 lines deleted). Net change: -190 lines source, single code path. The 21 reverse-correctness tests are migrated to a `reverse_pages_tests` sub-module in `serialized_reader.rs` and updated to use the new builder API. One additional test is added (`with_reverse_pages_is_noop_in_values_mode`) verifying the flag is harmless when the reader is constructed in `Values` mode (no `OffsetIndex`). The `reverse_page_reader` criterion bench is updated to use the new API. Bench numbers are within run-to-run noise of the pre-refactor result: ~50x speedup for `n <= one page`, scaling down with `n` as expected. Tracking: <apache#9934>
0b52384 to
4c8b46e
Compare
Thank you @alamb , you are right, most of it was just SerializedPageReader::new(...)?.with_reverse_pages(true)Flag flips iteration direction in Tried the pre-reversed-Vec route first but the existing dict inference uses |
Which issue does this PR close?
Tracking issue: #9934 — parquet: Support reverse page iteration for reverse streaming with filter pushdown.
This PR is a proof-of-concept (Phase 1) for the building block proposed in that issue. Its purpose is to ground the design discussion in real code, demonstrate feasibility, and collect feedback on the API shape before any final API commitment.
Rationale for this change
DataFusion has merged a row-group-level reverse scan (apache/datafusion#18817) for
ORDER BY DESC LIMIT Nqueries on ascending-sorted Parquet files. The current granularity (~128 MB per row group) means high first-batch latency and high peak memory; forWHERE filter ORDER BY DESC LIMIT N, whereRowSelectioncannot pre-select matching rows, the reverse stream is the only correct strategy.A page-level reverse iterator is the missing primitive. The key insight (per #9934) is to separate two distinct concepts:
OffsetIndex.page_locations)This PR implements the second.
What changes are included in this PR?
A small extension to the existing
SerializedPageReader::Pagesmode plus a builder flag, instead of a separate reader type. The earlier revision of this PR introduced a standaloneReverseSerializedPageReader; per @alamb's review, the two readers were ~95% duplicate (onlypop_frontvspop_backdiffered), so they have been collapsed into a single code path.Implementation details:
SerializedPageReaderState::Pagesgains areverse: boolfield.with_reverse_pages(true)flips the flag and seedspage_indextolen - 1, so the page ordinal stays correct (this matters for encryption AAD — reverse mode emits the back-most page first, which has ordinalN - 1in the original forward order).get_next_page/peek_next_page/skip_next_page/peek_next_page_offsetbranchpop_front↔pop_back,front↔back, and+= 1↔saturating_sub(1)under the flag.parquet::file::reverse_serialized_readermodule is gone (−1080 lines deleted).with_reverse_pagesis marked#[doc(hidden)]with a# Stabilitynote: the surface is intentionally experimental until the design from parquet: Support reverse page iteration for reverse streaming with filter pushdown #9934 is settled.Encryption now works for reverse iteration too, since the crypto context is shared with the forward path.
Limitations (Phase 1 POC, intentional)
peek_next_pagedoes not populatenum_rowsprecisely (the current implementation tracks a coarse value that is correct in forward mode and reversed accurately in reverse mode).OffsetIndexis available; inValuesmode the flag is a no-op.Test coverage (21 tests)
Located in
parquet::file::serialized_reader::reverse_pages_tests. All 21 tests run on the unifiedSerializedPageReader::with_reverse_pages(true)API.ColumnReaderImpl)peek_next_pageover full lifecycle;skip_next_pagewith / without dict; empty-chunk peekwith_reverse_pagesis a no-op inValuesmodeThe page-level tests verify byte-identical buffers between forward and reverse output. The decode-level tests further verify that, after page-boundary slicing and re-arrangement,
ColumnReaderImplproduces identical values anddef_levels.Benchmarks
Two criterion bench groups in
parquet/benches/reverse_page_reader.rs. All numbers below are--quick, Apple M-series, 100 000 INT32 rows, 98 data pages, no dictionary.1. Per-page cost parity (
reverse_page_reader/{codec})The empirical claim of Phase 1 — reverse iteration adds no per-page overhead vs the forward path.
All within measurement noise.
2. Time-to-first-N reversed values (
time_to_first_n_reversed_values/...)Compares the existing row-group-level reverse strategy used by DataFusion (forward-decode the entire chunk, reverse the value buffer, take
n) against the page-level reverse strategy enabled by this PR.The bench has two page-level functions:
page_reverse— single trailing reverse, correct only forn <= PAGE_ROW_COUNT_LIMIT(one page worth of rows). Used for the small-ncases below.page_reverse_cross— per-page reverse + append, correct for anyn. This is the algorithmic shape Phase 2 (RecordBatchReader integration) will use: per-page reverse + emit, no cross-page accumulation.Speedup follows
total_pages / pages_needed_for_nand converges to ~1x asnapproaches the full chunk — the expected shape for a primitive that saves work proportional to the unread tail.3. Peak buffer memory
Time savings come with a corresponding memory reduction. The row-group-reverse strategy must materialize the entire chunk before applying the trailing reverse + truncate; page-level reverse only needs to hold enough pages to satisfy
n.For the bench fixture (100 000 INT32, 98 pages, 1 024 rows / page):
Real-world numbers track the same ratio at a much larger absolute scale. The default Parquet writer produces ~128 MB row groups with ~1 MB pages; under DataFusion's existing row-group-reverse strategy, peak buffer is ~128 MB per row group. Page-level reverse with a small
n(the typical TopK case) needs ~1 MB, a ~128x reduction. This is the structural memory benefit highlighted in #9934, and it scales independently of the per-page-cost-parity result above.Caveats
Bytes, so I/O latency is not modeled. On real S3 / object-store backends, both readers issue one byte-range read per page; the page-reverse advantage holds and is amplified by network round-trips dominating decode cost.RowFilterpushdown, both strategies pay filter-evaluation cost; the page-reverse advantage compounds because the row-group strategy filter-evaluates pages it ultimately discards. This is the killer Phase 3 use case from parquet: Support reverse page iteration for reverse streaming with filter pushdown #9934 (WHERE filter ORDER BY DESC LIMIT N).Why this is an RFC
The remaining work for end-to-end usefulness sits above this primitive:
ParquetRecordBatchReadermode that aligns batch boundaries with page boundaries and reverses each page'sRecordBatchafter decoding, so the final stream is fully reversed for anyn. The bench'spage_reverse_crossfunction shows the algorithmic shape.Specific feedback I'd appreciate:
#[doc(hidden)]+ a# Stabilitynote enough, or would you prefer an explicitexperimentalcargo feature?peek_next_pagesemantics: precisenum_rowsin reverse mode is symmetric to the forward computation but usesbackinstead offront; current implementation already does this. Want it explicitly tested as a separate case?ParquetRecordBatchReaderBuilder::with_reverse_pages(true)flag (mirroring this PR's lower-level addition), or should reverse iteration be exposed only through a higher-levelwith_order(Asc | Desc)style?Are there any user-facing changes?
A new (
#[doc(hidden)]) builder methodSerializedPageReader::with_reverse_pages. No existing APIs are changed. Encryption is supported in reverse mode through the shared crypto context.Verification