diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 937be1dd2cfc..806b41a353b4 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -41,7 +41,13 @@ use bytes::Bytes; use crate::errors::{ParquetError, Result}; use crate::util::bit_util::{self, BitReader, BitWriter, FromBitpacked}; -/// Maximum groups of 8 values per bit-packed run. Current value is 64. +/// Number of values in one bit-packed group. The Parquet RLE/bit-packing hybrid +/// format always bit-packs values in multiples of this count (see the +/// [format spec](https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3): +/// "we always bit-pack a multiple of 8 values at a time"). +const BIT_PACK_GROUP_SIZE: usize = 8; + +/// Maximum groups of `BIT_PACK_GROUP_SIZE` values per bit-packed run. Current value is 64. const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6; /// A RLE/Bit-Packing hybrid encoder. @@ -54,9 +60,9 @@ pub struct RleEncoder { bit_writer: BitWriter, // Buffered values for bit-packed runs. - buffered_values: [u64; 8], + buffered_values: [u64; BIT_PACK_GROUP_SIZE], - // Number of current buffered values. Must be less than 8. + // Number of current buffered values. Must be less than BIT_PACK_GROUP_SIZE. num_buffered_values: usize, // The current (also last) value that was written and the count of how many @@ -89,7 +95,7 @@ impl RleEncoder { RleEncoder { bit_width, bit_writer, - buffered_values: [0; 8], + buffered_values: [0; BIT_PACK_GROUP_SIZE], num_buffered_values: 0, current_value: 0, repeat_count: 0, @@ -101,22 +107,23 @@ impl RleEncoder { /// Returns the maximum buffer size to encode `num_values` values with /// `bit_width`. pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize { - // The maximum size occurs with the shortest possible runs of 8 - let num_runs = bit_util::ceil(num_values, 8); + // The maximum size occurs with the shortest possible runs of BIT_PACK_GROUP_SIZE + let num_runs = bit_util::ceil(num_values, BIT_PACK_GROUP_SIZE); - // The number of bytes in a run of 8 + // The number of bytes in a run of BIT_PACK_GROUP_SIZE let bytes_per_run = bit_width as usize; - // The maximum size if stored as shortest possible bit packed runs of 8 + // The maximum size if stored as shortest possible bit packed runs of BIT_PACK_GROUP_SIZE let bit_packed_max_size = num_runs + num_runs * bytes_per_run; - // The length of `8` VLQ encoded + // The length of `BIT_PACK_GROUP_SIZE` VLQ encoded let rle_len_prefix = 1; - // The length of an RLE run of 8 - let min_rle_run_size = rle_len_prefix + bit_util::ceil(bit_width as usize, 8); + // The length of an RLE run of BIT_PACK_GROUP_SIZE + let min_rle_run_size = + rle_len_prefix + bit_util::ceil(bit_width as usize, u8::BITS as usize); - // The maximum size if stored as shortest possible RLE runs of 8 + // The maximum size if stored as shortest possible RLE runs of BIT_PACK_GROUP_SIZE let rle_max_size = num_runs * min_rle_run_size; bit_packed_max_size.max(rle_max_size) @@ -125,16 +132,17 @@ impl RleEncoder { /// Encodes `value`, which must be representable with `bit_width` bits. #[inline] pub fn put(&mut self, value: u64) { - // This function buffers 8 values at a time. After seeing 8 values, it - // decides whether the current run should be encoded in bit-packed or RLE. + // This function buffers BIT_PACK_GROUP_SIZE values at a time. After seeing that + // many values, it decides whether the current run should be encoded in bit-packed + // or RLE. if self.current_value == value { self.repeat_count += 1; - if self.repeat_count > 8 { + if self.repeat_count > BIT_PACK_GROUP_SIZE { // A continuation of last value. No need to buffer. return; } } else { - if self.repeat_count >= 8 { + if self.repeat_count >= BIT_PACK_GROUP_SIZE { // The current RLE run has ended and we've gathered enough. Flush first. debug_assert_eq!(self.bit_packed_count, 0); self.flush_rle_run(); @@ -145,9 +153,9 @@ impl RleEncoder { self.buffered_values[self.num_buffered_values] = value; self.num_buffered_values += 1; - if self.num_buffered_values == 8 { + if self.num_buffered_values == BIT_PACK_GROUP_SIZE { // Buffered values are full. Flush them. - debug_assert_eq!(self.bit_packed_count % 8, 0); + debug_assert_eq!(self.bit_packed_count % BIT_PACK_GROUP_SIZE, 0); self.flush_buffered_values(); } } @@ -219,9 +227,9 @@ impl RleEncoder { if self.repeat_count > 0 && all_repeat { self.flush_rle_run(); } else { - // Buffer the last group of bit-packed values to 8 by padding with 0s. + // Buffer the last group of bit-packed values to BIT_PACK_GROUP_SIZE by padding with 0s. if self.num_buffered_values > 0 { - while self.num_buffered_values < 8 { + while self.num_buffered_values < BIT_PACK_GROUP_SIZE { self.buffered_values[self.num_buffered_values] = 0; self.num_buffered_values += 1; } @@ -239,7 +247,7 @@ impl RleEncoder { self.bit_writer.put_vlq_int(indicator_value as u64); self.bit_writer.put_aligned( self.current_value, - bit_util::ceil(self.bit_width as usize, 8), + bit_util::ceil(self.bit_width as usize, u8::BITS as usize), ); self.num_buffered_values = 0; self.repeat_count = 0; @@ -263,7 +271,7 @@ impl RleEncoder { // Called when ending a bit-packed run. Writes the indicator byte to the reserved // position in `bit_writer` fn finish_bit_packed_run(&mut self) { - let num_groups = self.bit_packed_count / 8; + let num_groups = self.bit_packed_count / BIT_PACK_GROUP_SIZE; let indicator_byte = ((num_groups << 1) | 1) as u8; self.bit_writer .put_aligned_offset(indicator_byte, 1, self.indicator_byte_pos as usize); @@ -272,20 +280,20 @@ impl RleEncoder { } fn flush_buffered_values(&mut self) { - if self.repeat_count >= 8 { + if self.repeat_count >= BIT_PACK_GROUP_SIZE { // Clear buffered values as they are not needed self.num_buffered_values = 0; if self.bit_packed_count > 0 { // In this case we have chosen to switch to RLE encoding. Close out the // previous bit-packed run. - debug_assert_eq!(self.bit_packed_count % 8, 0); + debug_assert_eq!(self.bit_packed_count % BIT_PACK_GROUP_SIZE, 0); self.finish_bit_packed_run(); } return; } self.bit_packed_count += self.num_buffered_values; - let num_groups = self.bit_packed_count / 8; + let num_groups = self.bit_packed_count / BIT_PACK_GROUP_SIZE; if num_groups + 1 >= MAX_GROUPS_PER_BIT_PACKED_RUN { // We've reached the maximum value that can be hold in a single bit-packed // run. @@ -359,7 +367,7 @@ impl RleDecoder { #[inline(never)] #[allow(unused)] pub fn get(&mut self) -> Result> { - assert!(size_of::() <= 8); + assert!(size_of::() <= size_of::()); while self.rle_left == 0 && self.bit_packed_left == 0 { if !self.reload()? { @@ -395,7 +403,7 @@ impl RleDecoder { #[inline(never)] pub fn get_batch(&mut self, buffer: &mut [T]) -> Result { - assert!(size_of::() <= 8); + assert!(size_of::() <= size_of::()); let mut values_read = 0; while values_read < buffer.len() { @@ -594,10 +602,10 @@ impl RleDecoder { return Ok(false); } if indicator_value & 1 == 1 { - self.bit_packed_left = ((indicator_value >> 1) * 8) as u32; + self.bit_packed_left = ((indicator_value >> 1) * BIT_PACK_GROUP_SIZE as i64) as u32; } else { self.rle_left = (indicator_value >> 1) as u32; - let value_width = bit_util::ceil(self.bit_width as usize, 8); + let value_width = bit_util::ceil(self.bit_width as usize, u8::BITS as usize); self.current_value = bit_reader.get_aligned::(value_width); self.current_value.ok_or_else(|| { general_err!("parquet_data_error: not enough data for RLE decoding") @@ -626,7 +634,7 @@ mod tests { let data = vec![0x03, 0x88, 0xC6, 0xFA]; let mut decoder: RleDecoder = RleDecoder::new(3); decoder.set_data(data.into()).unwrap(); - let mut buffer = vec![0; 8]; + let mut buffer = vec![0; BIT_PACK_GROUP_SIZE]; let expected = vec![0, 1, 2, 3, 4, 5, 6, 7]; let result = decoder.get_batch::(&mut buffer); assert!(result.is_ok()); @@ -810,14 +818,18 @@ mod tests { let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B]; let mut decoder: RleDecoder = RleDecoder::new(3); decoder.set_data(data.into()).unwrap(); - let mut buffer = vec![""; 8]; + let mut buffer = vec![""; BIT_PACK_GROUP_SIZE]; let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"]; let skipped = decoder.skip(4).expect("skipping four values"); assert_eq!(skipped, 4); let remainder = decoder - .get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 8) + .get_batch_with_dict::<&str>( + dict.as_slice(), + buffer.as_mut_slice(), + BIT_PACK_GROUP_SIZE, + ) .expect("getting remainder"); - assert_eq!(remainder, 8); + assert_eq!(remainder, BIT_PACK_GROUP_SIZE); assert_eq!(buffer, expected); } @@ -879,7 +891,7 @@ mod tests { &values[..], width as u8, None, - 2 * (1 + bit_util::ceil(width as i64, 8) as i32), + 2 * (1 + bit_util::ceil(width as i64, u8::BITS as i64) as i32), ); } @@ -889,9 +901,12 @@ mod tests { for i in 0..101 { values.push(i % 2); } - let num_groups = bit_util::ceil(100, 8) as u8; + let num_groups = bit_util::ceil(100, BIT_PACK_GROUP_SIZE) as u8; expected_buffer.push((num_groups << 1) | 1); - expected_buffer.resize(expected_buffer.len() + 100 / 8, 0b10101010); + expected_buffer.resize( + expected_buffer.len() + 100 / BIT_PACK_GROUP_SIZE, + 0b10101010, + ); // For the last 4 0 and 1's, padded with 0. expected_buffer.push(0b00001010); @@ -902,12 +917,12 @@ mod tests { 1 + num_groups as i32, ); for width in 2..MAX_WIDTH + 1 { - let num_values = bit_util::ceil(100, 8) * 8; + let num_values = bit_util::ceil(100, BIT_PACK_GROUP_SIZE) * BIT_PACK_GROUP_SIZE; validate_rle( &values, width as u8, None, - 1 + bit_util::ceil(width as i64 * num_values, 8) as i32, + 1 + bit_util::ceil(width as i64 * num_values as i64, u8::BITS as i64) as i32, ); } } @@ -1001,9 +1016,9 @@ mod tests { .get_batch(&mut actual_values) .expect("get_batch() should be OK"); - // Should decode 8 values despite only encoding 6 as length of - // bit packed run is always multiple of 8 - assert_eq!(r, 8); + // Should decode BIT_PACK_GROUP_SIZE values despite only encoding 6 as length of + // bit packed run is always a multiple of BIT_PACK_GROUP_SIZE + assert_eq!(r, BIT_PACK_GROUP_SIZE); assert_eq!(actual_values[..6], values); assert_eq!(actual_values[6], 0); assert_eq!(actual_values[7], 0); @@ -1024,7 +1039,7 @@ mod tests { let num_values = 2002; // bit-packed header - let run_bytes = ceil(num_values * bit_width, 8) as u64; + let run_bytes = ceil(num_values * bit_width, u8::BITS as usize) as u64; writer.put_vlq_int((run_bytes << 1) | 1); for _ in 0..run_bytes { writer.put_aligned(0xFF_u8, 1);