Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
/// Creates a new streaming level encoder appropriate for the writer version.
fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder {
match props.writer_version() {
WriterVersion::PARQUET_1_0 => LevelEncoder::v1_streaming(Encoding::RLE, max_level),
WriterVersion::PARQUET_1_0 => LevelEncoder::v1_streaming(max_level),
WriterVersion::PARQUET_2_0 => LevelEncoder::v2_streaming(max_level),
}
}
Expand Down
34 changes: 7 additions & 27 deletions parquet/src/encodings/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,32 @@ use std::mem;

use super::rle::RleEncoder;

use crate::basic::Encoding;
use crate::data_type::AsBytes;
use crate::util::bit_util::{BitWriter, num_required_bits};
use crate::util::bit_util::num_required_bits;

/// Encoder for definition/repetition levels.
/// Currently only supports Rle and BitPacked (dev/null) encoding, including v2.
pub enum LevelEncoder {
Rle(RleEncoder),
RleV2(RleEncoder),
BitPacked(u8, BitWriter),
}

impl LevelEncoder {
/// Creates a new streaming level encoder for Data Page v1.
///
/// Unlike [`v1`](Self::v1), this does not require knowing the number of values
/// This does not require knowing the number of values
/// upfront, making it suitable for incremental encoding where levels are fed in
/// as they arrive via [`put`](Self::put).
pub fn v1_streaming(encoding: Encoding, max_level: i16) -> Self {
pub fn v1_streaming(max_level: i16) -> Self {
let bit_width = num_required_bits(max_level as u64);
match encoding {
Encoding::RLE => {
// Reserve space for length header
let buffer = vec![0u8; 4];
LevelEncoder::Rle(RleEncoder::new_from_buf(bit_width, buffer))
}
#[allow(deprecated)]
Encoding::BIT_PACKED => {
LevelEncoder::BitPacked(bit_width, BitWriter::new_from_buf(Vec::new()))
}
_ => panic!("Unsupported encoding type {encoding}"),
}
// Reserve space for length header
let buffer = vec![0u8; 4];
LevelEncoder::Rle(RleEncoder::new_from_buf(bit_width, buffer))
}

/// Creates a new streaming RLE level encoder for Data Page v2.
///
/// Unlike [`v2`](Self::v2), this does not require knowing the number of values
/// This does not require knowing the number of values
/// upfront, making it suitable for incremental encoding where levels are fed in
/// as they arrive via [`put`](Self::put).
pub fn v2_streaming(max_level: i16) -> Self {
Expand All @@ -80,12 +69,6 @@ impl LevelEncoder {
num_encoded += 1;
}
}
LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
for value in buffer {
encoder.put_value(*value as u64, bit_width as usize);
num_encoded += 1;
}
}
}
num_encoded
}
Expand All @@ -106,7 +89,6 @@ impl LevelEncoder {
encoded_data
}
LevelEncoder::RleV2(encoder) => encoder.consume(),
LevelEncoder::BitPacked(_, encoder) => encoder.consume(),
}
}

Expand All @@ -126,7 +108,6 @@ impl LevelEncoder {
f(data)
}
LevelEncoder::RleV2(encoder) => f(encoder.flush_buffer()),
LevelEncoder::BitPacked(_, encoder) => f(encoder.flush_buffer()),
};
match self {
LevelEncoder::Rle(encoder) => {
Expand All @@ -135,7 +116,6 @@ impl LevelEncoder {
encoder.skip(mem::size_of::<i32>());
}
LevelEncoder::RleV2(encoder) => encoder.clear(),
LevelEncoder::BitPacked(_, encoder) => encoder.clear(),
}
result
}
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/util/test_common/page_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl DataPageBuilderImpl {
if max_level <= 0 {
return 0;
}
let mut level_encoder = LevelEncoder::v1_streaming(Encoding::RLE, max_level);
let mut level_encoder = LevelEncoder::v1_streaming(max_level);
level_encoder.put(levels);
let encoded_levels = level_encoder.consume();
// Actual encoded bytes (without length offset)
Expand Down
Loading