From 433b1bdd115871c52092426a3bd233a5e03f95f9 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Tue, 21 Apr 2026 00:15:09 +0000 Subject: [PATCH 1/8] Add PFOR encoding core implementation Implements the PFOR (Patched Frame of Reference) integer compression encoding for INT32 and INT64 columns in the pfor package: - PforConstants: header/vector sizes, max exceptions (65535) - PforEncoderDecoder: histogram-based cost model for optimal bit width - PforValuesWriter: IntPforValuesWriter + LongPforValuesWriter with vector-buffered encoding and interleaved page layout - PforValuesReader: abstract base with lazy per-vector decoding - PforValuesReaderForInt: INT32 decoder using BytePacker - PforValuesReaderForLong: INT64 decoder using BytePackerForLong --- .../column/values/pfor/PforConstants.java | 81 ++++ .../values/pfor/PforEncoderDecoder.java | 161 +++++++ .../column/values/pfor/PforValuesReader.java | 151 ++++++ .../values/pfor/PforValuesReaderForInt.java | 142 ++++++ .../values/pfor/PforValuesReaderForLong.java | 147 ++++++ .../column/values/pfor/PforValuesWriter.java | 445 ++++++++++++++++++ 6 files changed, 1127 insertions(+) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforConstants.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforEncoderDecoder.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReader.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesWriter.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforConstants.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforConstants.java new file mode 100644 index 0000000000..81afe98c07 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforConstants.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.pfor; + +import org.apache.parquet.Preconditions; + +/** + * Constants for the PFOR (Patched Frame of Reference) encoding. + * + *

PFOR encoding compresses integer columns (INT32/INT64) by: + *

    + *
  1. Subtracting the minimum value (Frame of Reference)
  2. + *
  3. Choosing an optimal bit width via a cost model
  4. + *
  5. Bit-packing the deltas at the chosen width
  6. + *
  7. Storing outlier values (exceptions) separately with their positions
  8. + *
+ */ +public final class PforConstants { + + private PforConstants() { + // Utility class + } + + // Page header fields (7 bytes total) + public static final int PFOR_PACKING_MODE_FOR = 0; + public static final int PFOR_HEADER_SIZE = 7; + + public static final int DEFAULT_VECTOR_SIZE = 1024; + public static final int DEFAULT_VECTOR_SIZE_LOG = 10; + + // Capped at 15 (vectorSize=32768) because num_exceptions is uint16, + // so vectorSize must not exceed 65535 to avoid overflow when all values are exceptions. + static final int MAX_LOG_VECTOR_SIZE = 15; + static final int MIN_LOG_VECTOR_SIZE = 3; + + // Maximum exceptions per vector (uint16) + public static final int MAX_EXCEPTIONS = 65535; + + // Per-vector metadata sizes in bytes + // INT32: frame_of_reference(4) + bit_width(1) + num_exceptions(2) = 7 + public static final int INT32_VECTOR_INFO_SIZE = 7; + // INT64: frame_of_reference(8) + bit_width(1) + num_exceptions(2) = 11 + public static final int INT64_VECTOR_INFO_SIZE = 11; + + // Value byte widths + public static final int INT32_VALUE_BYTE_WIDTH = 4; + public static final int INT64_VALUE_BYTE_WIDTH = 8; + + /** Validates vector size: must be a power of 2 in [2^MIN_LOG .. 2^MAX_LOG]. */ + static int validateVectorSize(int vectorSize) { + Preconditions.checkArgument( + vectorSize > 0 && (vectorSize & (vectorSize - 1)) == 0, + "Vector size must be a power of 2, got: %s", + vectorSize); + int logSize = Integer.numberOfTrailingZeros(vectorSize); + Preconditions.checkArgument( + logSize >= MIN_LOG_VECTOR_SIZE && logSize <= MAX_LOG_VECTOR_SIZE, + "Vector size log2 must be between %s and %s, got: %s (vectorSize=%s)", + MIN_LOG_VECTOR_SIZE, + MAX_LOG_VECTOR_SIZE, + logSize, + vectorSize); + return vectorSize; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforEncoderDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforEncoderDecoder.java new file mode 100644 index 0000000000..1c56eb38f4 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforEncoderDecoder.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.pfor; + +/** + * Core PFOR encoding/decoding logic with histogram-based cost model. + * + *

The cost model selects the optimal bit width by evaluating: + *

+ * total_cost(b) = num_elements * b + num_exceptions(b) * (16 + value_bits)
+ * 
+ * where {@code value_bits} is 32 for INT32 and 64 for INT64, and + * {@code num_exceptions(b)} is the count of deltas requiring more than {@code b} bits. + */ +public final class PforEncoderDecoder { + + private PforEncoderDecoder() { + // Utility class + } + + /** Result of the optimal bit width search. */ + public static final class BitWidthResult { + public final int bitWidth; + public final int numExceptions; + + BitWidthResult(int bitWidth, int numExceptions) { + this.bitWidth = bitWidth; + this.numExceptions = numExceptions; + } + } + + /** + * Find the optimal bit width for packing INT32 unsigned deltas. + * + *

Builds a histogram of bits required per delta, then evaluates each + * candidate bit width from 0 to 32 using the cost model. + * + * @param deltas unsigned deltas (values[] - min), treated as unsigned int + * @param numElements number of elements + * @return the optimal bit width and resulting number of exceptions + */ + public static BitWidthResult findOptimalBitWidthForInt(int[] deltas, int numElements) { + // Histogram: bitsHist[b] = count of deltas that need exactly b bits + int[] bitsHist = new int[33]; // 0..32 + for (int i = 0; i < numElements; i++) { + bitsHist[bitWidthForInt(deltas[i])]++; + } + + // Exception cost per exception: position(16 bits) + value(32 bits) = 48 bits + final long exceptionBitsPerValue = 16 + 32; + + long bestCost = Long.MAX_VALUE; + int bestBitWidth = 0; + int bestExceptions = 0; + + // exceptionsAbove[b] = number of deltas requiring > b bits + int exceptionsAbove = numElements; // at b=0, all nonzero deltas might be exceptions + // Actually: deltas requiring > 0 bits = all deltas with bitsRequired > 0 + // We need to track cumulative: exceptionsAbove starts at numElements - bitsHist[0] + // But let's compute it properly by starting from b=0. + // At b=0, only deltas requiring 0 bits (i.e., delta==0) are NOT exceptions. + // Correction: at candidate bit_width = b, values needing bitsRequired > b are exceptions. + // bitsRequired(0) = 0, so delta==0 needs 0 bits. At b=0, exceptions = values with bitsRequired > 0. + exceptionsAbove = numElements - bitsHist[0]; + + for (int b = 0; b <= 32; b++) { + long packingCost = (long) numElements * b; + long exceptionCost = (long) exceptionsAbove * exceptionBitsPerValue; + long totalCost = packingCost + exceptionCost; + + if (totalCost < bestCost) { + bestCost = totalCost; + bestBitWidth = b; + bestExceptions = exceptionsAbove; + } + + // Move to next candidate: values requiring exactly (b+1) bits are no longer exceptions + if (b < 32) { + exceptionsAbove -= bitsHist[b + 1]; + } + } + + return new BitWidthResult(bestBitWidth, bestExceptions); + } + + /** + * Find the optimal bit width for packing INT64 unsigned deltas. + * + * @param deltas unsigned deltas (values[] - min), treated as unsigned long + * @param numElements number of elements + * @return the optimal bit width and resulting number of exceptions + */ + public static BitWidthResult findOptimalBitWidthForLong(long[] deltas, int numElements) { + // Histogram: bitsHist[b] = count of deltas that need exactly b bits + int[] bitsHist = new int[65]; // 0..64 + for (int i = 0; i < numElements; i++) { + bitsHist[bitWidthForLong(deltas[i])]++; + } + + // Exception cost per exception: position(16 bits) + value(64 bits) = 80 bits + final long exceptionBitsPerValue = 16 + 64; + + long bestCost = Long.MAX_VALUE; + int bestBitWidth = 0; + int bestExceptions = 0; + + int exceptionsAbove = numElements - bitsHist[0]; + + for (int b = 0; b <= 64; b++) { + long packingCost = (long) numElements * b; + long exceptionCost = (long) exceptionsAbove * exceptionBitsPerValue; + long totalCost = packingCost + exceptionCost; + + if (totalCost < bestCost) { + bestCost = totalCost; + bestBitWidth = b; + bestExceptions = exceptionsAbove; + } + + if (b < 64) { + exceptionsAbove -= bitsHist[b + 1]; + } + } + + return new BitWidthResult(bestBitWidth, bestExceptions); + } + + /** + * Returns the number of bits required to represent an unsigned int value. + * Returns 0 for value == 0. + */ + public static int bitWidthForInt(int value) { + if (value == 0) return 0; + return 32 - Integer.numberOfLeadingZeros(value); + } + + /** + * Returns the number of bits required to represent an unsigned long value. + * Returns 0 for value == 0. + */ + public static int bitWidthForLong(long value) { + if (value == 0) return 0; + return 64 - Long.numberOfLeadingZeros(value); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReader.java new file mode 100644 index 0000000000..25275b8179 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReader.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.pfor; + +import static org.apache.parquet.column.values.pfor.PforConstants.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.ParquetDecodingException; + +/** + * Abstract base class for PFOR values readers with lazy per-vector decoding. + * + *

Reads PFOR-encoded values from the interleaved page layout: + *

+ * ┌─────────┬──────────────────────┬──────────────┬──────────────┬─────┐
+ * │ Header  │ Offset Array         │ Vector 0     │ Vector 1     │ ... │
+ * │ 7 bytes │ 4B × numVectors │ (interleaved)│ (interleaved)│     │
+ * └─────────┴──────────────────────┴──────────────┴──────────────┴─────┘
+ * 
+ * + *

Each vector is decoded lazily on first access. Skipping values does not + * trigger decoding of intermediate vectors. + */ +abstract class PforValuesReader extends ValuesReader { + + protected int vectorSize; + protected int totalCount; + protected int numVectors; + protected int currentIndex; + protected int currentVectorIndex; + protected int valueByteWidth; + + protected int[] vectorOffsets; + protected ByteBuffer vectorsData; + protected int offsetArraySize; + + PforValuesReader() { + this.currentIndex = 0; + this.totalCount = 0; + this.currentVectorIndex = -1; + } + + @Override + public void initFromPage(int valuesCount, ByteBufferInputStream stream) + throws ParquetDecodingException, IOException { + ByteBuffer headerBuf = stream.slice(PFOR_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + int packingMode = headerBuf.get() & 0xFF; + int logVectorSize = headerBuf.get() & 0xFF; + int valueBW = headerBuf.get() & 0xFF; + int numElements = headerBuf.getInt(); + + if (packingMode != PFOR_PACKING_MODE_FOR) { + throw new ParquetDecodingException("Unsupported PFOR packing mode: " + packingMode); + } + if (logVectorSize < MIN_LOG_VECTOR_SIZE || logVectorSize > MAX_LOG_VECTOR_SIZE) { + throw new ParquetDecodingException("Invalid PFOR log vector size: " + logVectorSize + + ", must be between " + MIN_LOG_VECTOR_SIZE + " and " + MAX_LOG_VECTOR_SIZE); + } + if (valueBW != INT32_VALUE_BYTE_WIDTH && valueBW != INT64_VALUE_BYTE_WIDTH) { + throw new ParquetDecodingException( + "Invalid PFOR value byte width: " + valueBW + ", must be 4 or 8"); + } + if (numElements < 0) { + throw new ParquetDecodingException("Invalid PFOR element count: " + numElements); + } + + this.vectorSize = 1 << logVectorSize; + this.totalCount = numElements; + this.valueByteWidth = valueBW; + this.numVectors = (numElements + vectorSize - 1) / vectorSize; + this.currentIndex = 0; + this.currentVectorIndex = -1; + + this.offsetArraySize = numVectors * Integer.BYTES; + ByteBuffer offsetBuf = stream.slice(offsetArraySize).order(ByteOrder.LITTLE_ENDIAN); + this.vectorOffsets = new int[numVectors]; + for (int v = 0; v < numVectors; v++) { + vectorOffsets[v] = offsetBuf.getInt(); + } + + // Slice remaining bytes into a 0-based view so decodeVector can use + // absolute get methods (vectorsData.get(pos)) directly. + int remainingBytes = (int) stream.available(); + ByteBuffer rawSlice = stream.slice(remainingBytes); + this.vectorsData = rawSlice.slice().order(ByteOrder.LITTLE_ENDIAN); + + allocateDecodedBuffer(vectorSize); + } + + protected int getVectorLength(int vectorIdx) { + if (vectorIdx < numVectors - 1) { + return vectorSize; + } + // Last vector may be partial + int lastVectorLen = totalCount % vectorSize; + return lastVectorLen == 0 ? vectorSize : lastVectorLen; + } + + // Offsets in the page are relative to the compression body (after header), + // but vectorsData starts after the offset array, so adjust. + protected int getVectorDataPosition(int vectorIdx) { + return vectorOffsets[vectorIdx] - offsetArraySize; + } + + @Override + public void skip() { + skip(1); + } + + @Override + public void skip(int n) { + if (n < 0 || currentIndex + n > totalCount) { + throw new ParquetDecodingException(String.format( + "Cannot skip this many elements. Current index: %d. Skip %d. Total count: %d", + currentIndex, n, totalCount)); + } + currentIndex += n; + } + + protected void ensureVectorDecoded() { + int vectorIdx = currentIndex / vectorSize; + if (vectorIdx != currentVectorIndex) { + decodeVector(vectorIdx); + currentVectorIndex = vectorIdx; + } + } + + protected abstract void allocateDecodedBuffer(int capacity); + + protected abstract void decodeVector(int vectorIdx); +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java new file mode 100644 index 0000000000..420a188098 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.pfor; + +import static org.apache.parquet.column.values.pfor.PforConstants.*; + +import java.nio.ByteBuffer; +import org.apache.parquet.column.values.bitpacking.BytePacker; +import org.apache.parquet.column.values.bitpacking.Packer; +import org.apache.parquet.io.ParquetDecodingException; + +/** + * PFOR values reader for INT32 type with lazy per-vector decoding. + * + *

Reads PFOR-encoded int values from the interleaved page layout. + * Each vector is decoded on first access using BytePacker-based unpacking. + * + *

Per-vector format: + *

+ * PforVectorInfo (7B): frame_of_reference(4) + bit_width(1) + num_exceptions(2)
+ * PackedValues: ceil(N * bit_width / 8) bytes
+ * ExceptionPositions: num_exceptions * 2 bytes
+ * ExceptionValues: num_exceptions * 4 bytes
+ * 
+ */ +public class PforValuesReaderForInt extends PforValuesReader { + + private int[] decodedValues; + + public PforValuesReaderForInt() { + super(); + } + + @Override + protected void allocateDecodedBuffer(int capacity) { + this.decodedValues = new int[capacity]; + } + + @Override + public int readInteger() { + if (currentIndex >= totalCount) { + throw new ParquetDecodingException("PFOR int data was already exhausted."); + } + ensureVectorDecoded(); + int indexInVector = currentIndex % vectorSize; + currentIndex++; + return decodedValues[indexInVector]; + } + + @Override + protected void decodeVector(int vectorIdx) { + int vectorLen = getVectorLength(vectorIdx); + int pos = getVectorDataPosition(vectorIdx); + + // Read PforVectorInfo (7 bytes) + int frameOfReference = getIntLE(vectorsData, pos); + int bitWidth = vectorsData.get(pos + 4) & 0xFF; + int numExceptions = getShortLE(vectorsData, pos + 5) & 0xFFFF; + pos += INT32_VECTOR_INFO_SIZE; + + // Unpack bit-packed deltas + int[] deltas = new int[vectorLen]; + if (bitWidth > 0) { + pos = unpackIntsWithBytePacker(vectorsData, pos, deltas, vectorLen, bitWidth); + } + + // Add frame of reference to reconstruct values + for (int i = 0; i < vectorLen; i++) { + decodedValues[i] = deltas[i] + frameOfReference; + } + + // Overwrite exception slots with their original values + if (numExceptions > 0) { + int[] excPositions = new int[numExceptions]; + for (int e = 0; e < numExceptions; e++) { + excPositions[e] = getShortLE(vectorsData, pos) & 0xFFFF; + pos += Short.BYTES; + } + for (int e = 0; e < numExceptions; e++) { + decodedValues[excPositions[e]] = getIntLE(vectorsData, pos); + pos += Integer.BYTES; + } + } + } + + /** Unpack bit-packed ints in groups of 8, returns position after packed data. */ + private int unpackIntsWithBytePacker(ByteBuffer buf, int pos, int[] output, int count, int bitWidth) { + BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); + int numFullGroups = count / 8; + int remaining = count % 8; + + for (int g = 0; g < numFullGroups; g++) { + packer.unpack8Values(buf, pos, output, g * 8); + pos += bitWidth; + } + + if (remaining > 0) { + int totalPackedBytes = (count * bitWidth + 7) / 8; + int alreadyRead = numFullGroups * bitWidth; + int partialBytes = totalPackedBytes - alreadyRead; + + byte[] padded = new byte[bitWidth]; + for (int i = 0; i < partialBytes; i++) { + padded[i] = buf.get(pos + i); + } + + int[] temp = new int[8]; + packer.unpack8Values(padded, 0, temp, 0); + System.arraycopy(temp, 0, output, numFullGroups * 8, remaining); + pos += partialBytes; + } + + return pos; + } + + private static int getShortLE(ByteBuffer buf, int pos) { + return (buf.get(pos) & 0xFF) | ((buf.get(pos + 1) & 0xFF) << 8); + } + + private static int getIntLE(ByteBuffer buf, int pos) { + return (buf.get(pos) & 0xFF) + | ((buf.get(pos + 1) & 0xFF) << 8) + | ((buf.get(pos + 2) & 0xFF) << 16) + | ((buf.get(pos + 3) & 0xFF) << 24); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java new file mode 100644 index 0000000000..91ac244369 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.pfor; + +import static org.apache.parquet.column.values.pfor.PforConstants.*; + +import java.nio.ByteBuffer; +import org.apache.parquet.column.values.bitpacking.BytePackerForLong; +import org.apache.parquet.column.values.bitpacking.Packer; +import org.apache.parquet.io.ParquetDecodingException; + +/** + * PFOR values reader for INT64 type with lazy per-vector decoding. + * + *

Reads PFOR-encoded long values from the interleaved page layout. + * Each vector is decoded on first access using BytePackerForLong-based unpacking. + * + *

Per-vector format: + *

+ * PforVectorInfo (11B): frame_of_reference(8) + bit_width(1) + num_exceptions(2)
+ * PackedValues: ceil(N * bit_width / 8) bytes
+ * ExceptionPositions: num_exceptions * 2 bytes
+ * ExceptionValues: num_exceptions * 8 bytes
+ * 
+ */ +public class PforValuesReaderForLong extends PforValuesReader { + + private long[] decodedValues; + + public PforValuesReaderForLong() { + super(); + } + + @Override + protected void allocateDecodedBuffer(int capacity) { + this.decodedValues = new long[capacity]; + } + + @Override + public long readLong() { + if (currentIndex >= totalCount) { + throw new ParquetDecodingException("PFOR long data was already exhausted."); + } + ensureVectorDecoded(); + int indexInVector = currentIndex % vectorSize; + currentIndex++; + return decodedValues[indexInVector]; + } + + @Override + protected void decodeVector(int vectorIdx) { + int vectorLen = getVectorLength(vectorIdx); + int pos = getVectorDataPosition(vectorIdx); + + // Read PforVectorInfo (11 bytes) + long frameOfReference = getLongLE(vectorsData, pos); + int bitWidth = vectorsData.get(pos + 8) & 0xFF; + int numExceptions = getShortLE(vectorsData, pos + 9) & 0xFFFF; + pos += INT64_VECTOR_INFO_SIZE; + + // Unpack bit-packed deltas + long[] deltas = new long[vectorLen]; + if (bitWidth > 0) { + pos = unpackLongsWithBytePacker(vectorsData, pos, deltas, vectorLen, bitWidth); + } + + // Add frame of reference to reconstruct values + for (int i = 0; i < vectorLen; i++) { + decodedValues[i] = deltas[i] + frameOfReference; + } + + // Overwrite exception slots with their original values + if (numExceptions > 0) { + int[] excPositions = new int[numExceptions]; + for (int e = 0; e < numExceptions; e++) { + excPositions[e] = getShortLE(vectorsData, pos) & 0xFFFF; + pos += Short.BYTES; + } + for (int e = 0; e < numExceptions; e++) { + decodedValues[excPositions[e]] = getLongLE(vectorsData, pos); + pos += Long.BYTES; + } + } + } + + private int unpackLongsWithBytePacker(ByteBuffer buf, int pos, long[] output, int count, int bitWidth) { + BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidth); + int numFullGroups = count / 8; + int remaining = count % 8; + + for (int g = 0; g < numFullGroups; g++) { + packer.unpack8Values(buf, pos, output, g * 8); + pos += bitWidth; + } + + // Last group might have fewer than 8 values; zero-pad and unpack, + // but only advance pos by the actual bytes in the page. + if (remaining > 0) { + int totalPackedBytes = (count * bitWidth + 7) / 8; + int alreadyRead = numFullGroups * bitWidth; + int partialBytes = totalPackedBytes - alreadyRead; + + byte[] padded = new byte[bitWidth]; + for (int i = 0; i < partialBytes; i++) { + padded[i] = buf.get(pos + i); + } + + long[] temp = new long[8]; + packer.unpack8Values(padded, 0, temp, 0); + System.arraycopy(temp, 0, output, numFullGroups * 8, remaining); + pos += partialBytes; + } + + return pos; + } + + private static int getShortLE(ByteBuffer buf, int pos) { + return (buf.get(pos) & 0xFF) | ((buf.get(pos + 1) & 0xFF) << 8); + } + + private static long getLongLE(ByteBuffer buf, int pos) { + return (buf.get(pos) & 0xFFL) + | ((buf.get(pos + 1) & 0xFFL) << 8) + | ((buf.get(pos + 2) & 0xFFL) << 16) + | ((buf.get(pos + 3) & 0xFFL) << 24) + | ((buf.get(pos + 4) & 0xFFL) << 32) + | ((buf.get(pos + 5) & 0xFFL) << 40) + | ((buf.get(pos + 6) & 0xFFL) << 48) + | ((buf.get(pos + 7) & 0xFFL) << 56); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesWriter.java new file mode 100644 index 0000000000..ab39301dad --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesWriter.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.pfor; + +import static org.apache.parquet.column.values.pfor.PforConstants.*; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.CapacityByteArrayOutputStream; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.bitpacking.BytePacker; +import org.apache.parquet.column.values.bitpacking.BytePackerForLong; +import org.apache.parquet.column.values.bitpacking.Packer; + +/** + * PFOR (Patched Frame of Reference) values writer for INT32 and INT64 columns. + * + *

PFOR compresses integer columns by subtracting the minimum value (FOR), + * selecting an optimal bit width via a histogram-based cost model, bit-packing + * the deltas, and storing outlier values (exceptions) separately. + * + *

Writing is incremental: values are buffered in a fixed-size vector buffer, + * and each full vector is encoded and flushed to the output stream immediately. + * On {@link #getBytes()}, any remaining partial vector is flushed, and the + * final page bytes are assembled. + * + *

Interleaved Page Layout: + *

+ * ┌─────────┬──────────────────────┬──────────────┬──────────────┬─────┐
+ * │ Header  │ Offset Array         │ Vector 0     │ Vector 1     │ ... │
+ * │ 7 bytes │ 4B × numVectors │ (interleaved)│ (interleaved)│     │
+ * └─────────┴──────────────────────┴──────────────┴──────────────┴─────┘
+ * 
+ * + *

Each vector contains interleaved: + * PforVectorInfo(7B/11B) + PackedValues + ExceptionPositions + ExceptionValues + */ +public abstract class PforValuesWriter extends ValuesWriter { + + protected final int initialCapacity; + protected final int pageSize; + protected final ByteBufferAllocator allocator; + protected final int vectorSize; + protected final int logVectorSize; + + PforValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator, int vectorSize) { + PforConstants.validateVectorSize(vectorSize); + this.initialCapacity = initialCapacity; + this.pageSize = pageSize; + this.allocator = allocator; + this.vectorSize = vectorSize; + this.logVectorSize = Integer.numberOfTrailingZeros(vectorSize); + } + + @Override + public Encoding getEncoding() { + return Encoding.PFOR; + } + + /** INT32 writer. Buffers one vector at a time, encodes and flushes when full. */ + public static class IntPforValuesWriter extends PforValuesWriter { + private final int[] vectorBuffer; + private int bufferCount; + private int totalCount; + private CapacityByteArrayOutputStream encodedVectors; + private final List vectorByteSizes; + + public IntPforValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) { + this(initialCapacity, pageSize, allocator, DEFAULT_VECTOR_SIZE); + } + + public IntPforValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator, int vectorSize) { + super(initialCapacity, pageSize, allocator, vectorSize); + this.vectorBuffer = new int[vectorSize]; + this.bufferCount = 0; + this.totalCount = 0; + this.encodedVectors = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); + this.vectorByteSizes = new ArrayList<>(); + } + + @Override + public void writeInteger(int v) { + vectorBuffer[bufferCount++] = v; + totalCount++; + if (bufferCount == vectorSize) { + encodeAndFlushVector(bufferCount); + bufferCount = 0; + } + } + + private void encodeAndFlushVector(int vectorLen) { + // Find minimum value (frame of reference) + int minValue = vectorBuffer[0]; + for (int i = 1; i < vectorLen; i++) { + if (vectorBuffer[i] < minValue) { + minValue = vectorBuffer[i]; + } + } + + // Compute unsigned deltas + int[] deltas = new int[vectorLen]; + for (int i = 0; i < vectorLen; i++) { + deltas[i] = vectorBuffer[i] - minValue; + } + + // Find optimal bit width via cost model + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForInt(deltas, vectorLen); + int bitWidth = result.bitWidth; + int numExceptions = result.numExceptions; + + // Collect exceptions: values whose delta doesn't fit in bitWidth bits + short[] excPositions = new short[numExceptions]; + int[] excValues = new int[numExceptions]; + int excIdx = 0; + + if (numExceptions > 0) { + int mask = (bitWidth == 32) ? -1 : (1 << bitWidth) - 1; + for (int i = 0; i < vectorLen; i++) { + if (Integer.compareUnsigned(deltas[i], mask) > 0) { + excPositions[excIdx] = (short) i; + excValues[excIdx] = vectorBuffer[i]; // original value, not delta + excIdx++; + deltas[i] = 0; // placeholder in packed data + } + } + } + + long startSize = encodedVectors.size(); + + // PforVectorInfo: frame_of_reference(4) + bit_width(1) + num_exceptions(2) = 7B + ByteBuffer vectorInfo = ByteBuffer.allocate(INT32_VECTOR_INFO_SIZE).order(ByteOrder.LITTLE_ENDIAN); + vectorInfo.putInt(minValue); + vectorInfo.put((byte) bitWidth); + vectorInfo.putShort((short) numExceptions); + encodedVectors.write(vectorInfo.array(), 0, INT32_VECTOR_INFO_SIZE); + + // Pack deltas + if (bitWidth > 0) { + packIntsWithBytePacker(deltas, vectorLen, bitWidth); + } + + // Exception positions then values + if (numExceptions > 0) { + ByteBuffer excPosBuf = + ByteBuffer.allocate(numExceptions * Short.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < numExceptions; i++) { + excPosBuf.putShort(excPositions[i]); + } + encodedVectors.write(excPosBuf.array(), 0, numExceptions * Short.BYTES); + + ByteBuffer excValBuf = + ByteBuffer.allocate(numExceptions * Integer.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < numExceptions; i++) { + excValBuf.putInt(excValues[i]); + } + encodedVectors.write(excValBuf.array(), 0, numExceptions * Integer.BYTES); + } + + vectorByteSizes.add((int) (encodedVectors.size() - startSize)); + } + + private void packIntsWithBytePacker(int[] values, int count, int bitWidth) { + BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); + int numFullGroups = count / 8; + int remaining = count % 8; + byte[] packed = new byte[bitWidth]; + + for (int g = 0; g < numFullGroups; g++) { + packer.pack8Values(values, g * 8, packed, 0); + encodedVectors.write(packed, 0, bitWidth); + } + + // Partial last group: pack 8 values (zero-padded), but only write + // ceil(count * bitWidth / 8) - alreadyWritten bytes per spec. + if (remaining > 0) { + int[] padded = new int[8]; + System.arraycopy(values, numFullGroups * 8, padded, 0, remaining); + packer.pack8Values(padded, 0, packed, 0); + int totalPackedBytes = (count * bitWidth + 7) / 8; + int alreadyWritten = numFullGroups * bitWidth; + encodedVectors.write(packed, 0, totalPackedBytes - alreadyWritten); + } + } + + @Override + public long getBufferedSize() { + return encodedVectors.size() + (long) bufferCount * Integer.BYTES; + } + + @Override + public BytesInput getBytes() { + if (totalCount == 0) { + return BytesInput.empty(); + } + + if (bufferCount > 0) { + encodeAndFlushVector(bufferCount); + bufferCount = 0; + } + + int numVectors = vectorByteSizes.size(); + + // Header: packing_mode(1) + log_vector_size(1) + value_byte_width(1) + num_elements(4) = 7B + ByteBuffer header = ByteBuffer.allocate(PFOR_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + header.put((byte) PFOR_PACKING_MODE_FOR); + header.put((byte) logVectorSize); + header.put((byte) INT32_VALUE_BYTE_WIDTH); + header.putInt(totalCount); + + int offsetArraySize = numVectors * Integer.BYTES; + ByteBuffer offsets = ByteBuffer.allocate(offsetArraySize).order(ByteOrder.LITTLE_ENDIAN); + int currentOffset = offsetArraySize; + for (int v = 0; v < numVectors; v++) { + offsets.putInt(currentOffset); + currentOffset += vectorByteSizes.get(v); + } + + return BytesInput.concat( + BytesInput.from(header.array()), BytesInput.from(offsets.array()), BytesInput.from(encodedVectors)); + } + + @Override + public void reset() { + bufferCount = 0; + totalCount = 0; + encodedVectors.reset(); + vectorByteSizes.clear(); + } + + @Override + public void close() { + encodedVectors.close(); + } + + @Override + public long getAllocatedSize() { + return (long) vectorBuffer.length * Integer.BYTES + encodedVectors.getCapacity(); + } + + @Override + public String memUsageString(String prefix) { + return String.format( + "%s IntPforValuesWriter %d values, %d bytes allocated", prefix, totalCount, getAllocatedSize()); + } + } + + /** INT64 writer. Same structure as IntPforValuesWriter but uses longs. */ + public static class LongPforValuesWriter extends PforValuesWriter { + private final long[] vectorBuffer; + private int bufferCount; + private int totalCount; + private CapacityByteArrayOutputStream encodedVectors; + private final List vectorByteSizes; + + public LongPforValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) { + this(initialCapacity, pageSize, allocator, DEFAULT_VECTOR_SIZE); + } + + public LongPforValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator, int vectorSize) { + super(initialCapacity, pageSize, allocator, vectorSize); + this.vectorBuffer = new long[vectorSize]; + this.bufferCount = 0; + this.totalCount = 0; + this.encodedVectors = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); + this.vectorByteSizes = new ArrayList<>(); + } + + @Override + public void writeLong(long v) { + vectorBuffer[bufferCount++] = v; + totalCount++; + if (bufferCount == vectorSize) { + encodeAndFlushVector(bufferCount); + bufferCount = 0; + } + } + + private void encodeAndFlushVector(int vectorLen) { + long minValue = vectorBuffer[0]; + for (int i = 1; i < vectorLen; i++) { + if (vectorBuffer[i] < minValue) { + minValue = vectorBuffer[i]; + } + } + + long[] deltas = new long[vectorLen]; + for (int i = 0; i < vectorLen; i++) { + deltas[i] = vectorBuffer[i] - minValue; + } + + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForLong(deltas, vectorLen); + int bitWidth = result.bitWidth; + int numExceptions = result.numExceptions; + + short[] excPositions = new short[numExceptions]; + long[] excValues = new long[numExceptions]; + int excIdx = 0; + + if (numExceptions > 0) { + long mask = (bitWidth == 64) ? -1L : (1L << bitWidth) - 1L; + for (int i = 0; i < vectorLen; i++) { + if (Long.compareUnsigned(deltas[i], mask) > 0) { + excPositions[excIdx] = (short) i; + excValues[excIdx] = vectorBuffer[i]; // original value + excIdx++; + deltas[i] = 0; // placeholder + } + } + } + + long startSize = encodedVectors.size(); + + // PforVectorInfo: frame_of_reference(8) + bit_width(1) + num_exceptions(2) = 11B + ByteBuffer vectorInfo = ByteBuffer.allocate(INT64_VECTOR_INFO_SIZE).order(ByteOrder.LITTLE_ENDIAN); + vectorInfo.putLong(minValue); + vectorInfo.put((byte) bitWidth); + vectorInfo.putShort((short) numExceptions); + encodedVectors.write(vectorInfo.array(), 0, INT64_VECTOR_INFO_SIZE); + + if (bitWidth > 0) { + packLongsWithBytePacker(deltas, vectorLen, bitWidth); + } + + if (numExceptions > 0) { + ByteBuffer excPosBuf = + ByteBuffer.allocate(numExceptions * Short.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < numExceptions; i++) { + excPosBuf.putShort(excPositions[i]); + } + encodedVectors.write(excPosBuf.array(), 0, numExceptions * Short.BYTES); + + ByteBuffer excValBuf = + ByteBuffer.allocate(numExceptions * Long.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < numExceptions; i++) { + excValBuf.putLong(excValues[i]); + } + encodedVectors.write(excValBuf.array(), 0, numExceptions * Long.BYTES); + } + + vectorByteSizes.add((int) (encodedVectors.size() - startSize)); + } + + private void packLongsWithBytePacker(long[] values, int count, int bitWidth) { + BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidth); + int numFullGroups = count / 8; + int remaining = count % 8; + byte[] packed = new byte[bitWidth]; + + for (int g = 0; g < numFullGroups; g++) { + packer.pack8Values(values, g * 8, packed, 0); + encodedVectors.write(packed, 0, bitWidth); + } + + if (remaining > 0) { + long[] padded = new long[8]; + System.arraycopy(values, numFullGroups * 8, padded, 0, remaining); + packer.pack8Values(padded, 0, packed, 0); + int totalPackedBytes = (count * bitWidth + 7) / 8; + int alreadyWritten = numFullGroups * bitWidth; + encodedVectors.write(packed, 0, totalPackedBytes - alreadyWritten); + } + } + + @Override + public long getBufferedSize() { + return encodedVectors.size() + (long) bufferCount * Long.BYTES; + } + + @Override + public BytesInput getBytes() { + if (totalCount == 0) { + return BytesInput.empty(); + } + + if (bufferCount > 0) { + encodeAndFlushVector(bufferCount); + bufferCount = 0; + } + + int numVectors = vectorByteSizes.size(); + + ByteBuffer header = ByteBuffer.allocate(PFOR_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + header.put((byte) PFOR_PACKING_MODE_FOR); + header.put((byte) logVectorSize); + header.put((byte) INT64_VALUE_BYTE_WIDTH); + header.putInt(totalCount); + + int offsetArraySize = numVectors * Integer.BYTES; + ByteBuffer offsets = ByteBuffer.allocate(offsetArraySize).order(ByteOrder.LITTLE_ENDIAN); + int currentOffset = offsetArraySize; + for (int v = 0; v < numVectors; v++) { + offsets.putInt(currentOffset); + currentOffset += vectorByteSizes.get(v); + } + + return BytesInput.concat( + BytesInput.from(header.array()), BytesInput.from(offsets.array()), BytesInput.from(encodedVectors)); + } + + @Override + public void reset() { + bufferCount = 0; + totalCount = 0; + encodedVectors.reset(); + vectorByteSizes.clear(); + } + + @Override + public void close() { + encodedVectors.close(); + } + + @Override + public long getAllocatedSize() { + return (long) vectorBuffer.length * Long.BYTES + encodedVectors.getCapacity(); + } + + @Override + public String memUsageString(String prefix) { + return String.format( + "%s LongPforValuesWriter %d values, %d bytes allocated", prefix, totalCount, getAllocatedSize()); + } + } +} From 6304bbee1bb8a51efc7ee87810be0427ff2efa83 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Tue, 21 Apr 2026 00:15:22 +0000 Subject: [PATCH 2/8] Integrate PFOR encoding into parquet-java framework Wires PFOR encoding into the parquet-java read/write pipeline: - Encoding.java: add PFOR enum with INT32/INT64 reader dispatch - ParquetProperties.java: add pforEnabled column property with isPforEnabled() and builder methods withPforEncoding() - DefaultV2ValuesWriterFactory.java: PFOR takes priority over BYTE_STREAM_SPLIT and DELTA_BINARY_PACKED for INT32/INT64 - ParquetMetadataConverter.java: guard for PFOR until thrift spec is merged upstream --- .../org/apache/parquet/column/Encoding.java | 23 ++++++++++ .../parquet/column/ParquetProperties.java | 46 +++++++++++++++++++ .../factory/DefaultV2ValuesWriterFactory.java | 15 +++++- .../converter/ParquetMetadataConverter.java | 4 ++ 4 files changed, 86 insertions(+), 2 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java index 874c99fded..3241c3121f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java @@ -36,6 +36,8 @@ import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForInteger; import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForLong; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; +import org.apache.parquet.column.values.pfor.PforValuesReaderForInt; +import org.apache.parquet.column.values.pfor.PforValuesReaderForLong; import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader; import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; import org.apache.parquet.column.values.dictionary.DictionaryValuesReader; @@ -147,6 +149,27 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu } }, + /** + * PFOR (Patched Frame of Reference) encoding for INT32 and INT64 types. + * Compresses integer columns by subtracting the minimum value, selecting an + * optimal bit width via a cost model, bit-packing the deltas, and storing + * outlier values as exceptions. + */ + PFOR { + @Override + public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) { + switch (descriptor.getType()) { + case INT32: + return new PforValuesReaderForInt(); + case INT64: + return new PforValuesReaderForLong(); + default: + throw new ParquetDecodingException( + "PFOR encoding is only supported for INT32 and INT64, not " + descriptor.getType()); + } + } + }, + /** * @deprecated This is no longer used, and has been replaced by {@link #RLE} * which is combination of bit packing and rle diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..4036d35e34 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -50,6 +50,7 @@ public class ParquetProperties { public static final int DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE; public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; public static final boolean DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED = false; + public static final boolean DEFAULT_IS_PFOR_ENABLED = false; public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0; public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true; public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100; @@ -132,6 +133,7 @@ public static WriterVersion fromString(String name) { private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; private final ColumnProperty byteStreamSplitEnabled; + private final ColumnProperty pforEnabled; private final Map extraMetaData; private final ColumnProperty statistics; private final ColumnProperty sizeStatistics; @@ -164,6 +166,7 @@ private ParquetProperties(Builder builder) { this.pageRowCountLimit = builder.pageRowCountLimit; this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build(); + this.pforEnabled = builder.pforEnabled.build(); this.extraMetaData = builder.extraMetaData; this.statistics = builder.statistics.build(); this.sizeStatistics = builder.sizeStatistics.build(); @@ -259,6 +262,23 @@ public boolean isByteStreamSplitEnabled(ColumnDescriptor column) { } } + /** + * Check if PFOR encoding is enabled for the given column. + * PFOR encoding is only supported for INT32 and INT64 types. + * + * @param column the column descriptor + * @return true if PFOR encoding is enabled for this column + */ + public boolean isPforEnabled(ColumnDescriptor column) { + switch (column.getPrimitiveType().getPrimitiveTypeName()) { + case INT32: + case INT64: + return pforEnabled.getValue(column); + default: + return false; + } + } + public ByteBufferAllocator getAllocator() { return allocator; } @@ -416,6 +436,7 @@ public static class Builder { private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; private final ColumnProperty.Builder byteStreamSplitEnabled; + private final ColumnProperty.Builder pforEnabled; private Map extraMetaData = new HashMap<>(); private final ColumnProperty.Builder statistics; private final ColumnProperty.Builder sizeStatistics; @@ -427,6 +448,7 @@ private Builder() { DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED ? ByteStreamSplitMode.FLOATING_POINT : ByteStreamSplitMode.NONE); + pforEnabled = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_PFOR_ENABLED); bloomFilterEnabled = ColumnProperty.builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED); bloomFilterNDVs = ColumnProperty.builder().withDefaultValue(null); bloomFilterFPPs = ColumnProperty.builder().withDefaultValue(DEFAULT_BLOOM_FILTER_FPP); @@ -457,6 +479,7 @@ private Builder(ParquetProperties toCopy) { this.numBloomFilterCandidates = ColumnProperty.builder(toCopy.numBloomFilterCandidates); this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes; this.byteStreamSplitEnabled = ColumnProperty.builder(toCopy.byteStreamSplitEnabled); + this.pforEnabled = ColumnProperty.builder(toCopy.pforEnabled); this.extraMetaData = toCopy.extraMetaData; this.statistics = ColumnProperty.builder(toCopy.statistics); this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics); @@ -534,6 +557,29 @@ public Builder withExtendedByteStreamSplitEncoding(boolean enable) { return this; } + /** + * Enable or disable PFOR encoding for INT32 and INT64 columns. + * + * @param enable whether PFOR encoding should be enabled + * @return this builder for method chaining. + */ + public Builder withPforEncoding(boolean enable) { + this.pforEnabled.withDefaultValue(enable); + return this; + } + + /** + * Enable or disable PFOR encoding for the specified column. + * + * @param columnPath the path of the column (dot-string) + * @param enable whether PFOR encoding should be enabled + * @return this builder for method chaining. + */ + public Builder withPforEncoding(String columnPath, boolean enable) { + this.pforEnabled.withValue(columnPath, enable); + return this; + } + /** * Set the Parquet format dictionary page size. * diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java index c50b4e49c5..5891d943b5 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java @@ -27,6 +27,7 @@ import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger; +import org.apache.parquet.column.values.pfor.PforValuesWriter; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong; import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; @@ -115,7 +116,12 @@ private ValuesWriter getBinaryValuesWriter(ColumnDescriptor path) { private ValuesWriter getInt32ValuesWriter(ColumnDescriptor path) { final ValuesWriter fallbackWriter; - if (parquetProperties.isByteStreamSplitEnabled(path)) { + if (this.parquetProperties.isPforEnabled(path)) { + fallbackWriter = new PforValuesWriter.IntPforValuesWriter( + parquetProperties.getInitialSlabSize(), + parquetProperties.getPageSizeThreshold(), + parquetProperties.getAllocator()); + } else if (parquetProperties.isByteStreamSplitEnabled(path)) { fallbackWriter = new ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter( parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), @@ -132,7 +138,12 @@ private ValuesWriter getInt32ValuesWriter(ColumnDescriptor path) { private ValuesWriter getInt64ValuesWriter(ColumnDescriptor path) { final ValuesWriter fallbackWriter; - if (parquetProperties.isByteStreamSplitEnabled(path)) { + if (this.parquetProperties.isPforEnabled(path)) { + fallbackWriter = new PforValuesWriter.LongPforValuesWriter( + parquetProperties.getInitialSlabSize(), + parquetProperties.getPageSizeThreshold(), + parquetProperties.getAllocator()); + } else if (parquetProperties.isByteStreamSplitEnabled(path)) { fallbackWriter = new ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter( parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 60150439a6..7ee76063d6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -748,6 +748,10 @@ public org.apache.parquet.column.Encoding getEncoding(Encoding encoding) { } public Encoding getEncoding(org.apache.parquet.column.Encoding encoding) { + // PFOR encoding is not yet part of the parquet-format specification + if (encoding == org.apache.parquet.column.Encoding.PFOR) { + throw new IllegalArgumentException("PFOR encoding is not yet supported in the parquet-format specification"); + } return Encoding.valueOf(encoding.name()); } From ee0b108f9f5ef40a59662b89262c01e1dc03ce53 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Tue, 21 Apr 2026 00:25:15 +0000 Subject: [PATCH 3/8] Add PFOR encoding unit tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 64 tests covering: - PforEncoderDecoderTest: bit width utilities and histogram-based cost model - PforBitPackingTest: round-trip correctness across bit widths 0-64, partial groups, page header format - PforValuesEndToEndTest: full writer→reader pipeline including reset/reuse, skip, edge cases, random data --- .../values/pfor/PforBitPackingTest.java | 269 +++++++++++ .../values/pfor/PforEncoderDecoderTest.java | 206 ++++++++ .../values/pfor/PforValuesEndToEndTest.java | 446 ++++++++++++++++++ 3 files changed, 921 insertions(+) create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforBitPackingTest.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforEncoderDecoderTest.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforValuesEndToEndTest.java diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforBitPackingTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforBitPackingTest.java new file mode 100644 index 0000000000..20732f787c --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforBitPackingTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.pfor; + +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.junit.Test; + +/** + * Tests focused on PFOR bit-packing correctness across different bit widths + * and vector sizes. + */ +public class PforBitPackingTest { + + // Round-trip helper that verifies bit-packing for int values in a given range + private void verifyIntRoundTrip(int[] values) throws Exception { + int capacity = Math.max(256, values.length * 8); + PforValuesWriter.IntPforValuesWriter writer = new PforValuesWriter.IntPforValuesWriter( + capacity, capacity, new DirectByteBufferAllocator()); + + for (int v : values) { + writer.writeInteger(v); + } + + BytesInput bytes = writer.getBytes(); + PforValuesReaderForInt reader = new PforValuesReaderForInt(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(bytes.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + assertEquals("Mismatch at index " + i, values[i], reader.readInteger()); + } + + writer.close(); + } + + private void verifyLongRoundTrip(long[] values) throws Exception { + int capacity = Math.max(512, values.length * 16); + PforValuesWriter.LongPforValuesWriter writer = new PforValuesWriter.LongPforValuesWriter( + capacity, capacity, new DirectByteBufferAllocator()); + + for (long v : values) { + writer.writeLong(v); + } + + BytesInput bytes = writer.getBytes(); + PforValuesReaderForLong reader = new PforValuesReaderForLong(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(bytes.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + assertEquals("Mismatch at index " + i, values[i], reader.readLong()); + } + + writer.close(); + } + + // ========== INT32 Bit Width Coverage ========== + + @Test + public void testIntBitWidth0() throws Exception { + // All same value → bitWidth=0, no packed bytes + int[] values = new int[100]; + java.util.Arrays.fill(values, 777); + verifyIntRoundTrip(values); + } + + @Test + public void testIntBitWidth1() throws Exception { + // Values: base + {0, 1} + int[] values = new int[100]; + for (int i = 0; i < 100; i++) { + values[i] = 1000 + (i % 2); + } + verifyIntRoundTrip(values); + } + + @Test + public void testIntBitWidth8() throws Exception { + int[] values = new int[1024]; + for (int i = 0; i < 1024; i++) { + values[i] = 5000 + (i % 256); + } + verifyIntRoundTrip(values); + } + + @Test + public void testIntBitWidth16() throws Exception { + int[] values = new int[1024]; + for (int i = 0; i < 1024; i++) { + values[i] = i; + } + verifyIntRoundTrip(values); + } + + @Test + public void testIntBitWidth32() throws Exception { + // Full range int values + int[] values = {Integer.MIN_VALUE, -1, 0, 1, Integer.MAX_VALUE, + 0x7FFFFFFF, 0x40000000, -2147483648}; + verifyIntRoundTrip(values); + } + + @Test + public void testIntPartialGroup() throws Exception { + // 13 values: 1 full group of 8 + 5 remaining + int[] values = new int[13]; + for (int i = 0; i < 13; i++) { + values[i] = i * 100; + } + verifyIntRoundTrip(values); + } + + @Test + public void testIntExactlyOneGroup() throws Exception { + // Exactly 8 values + int[] values = {10, 20, 30, 40, 50, 60, 70, 80}; + verifyIntRoundTrip(values); + } + + @Test + public void testIntSevenValues() throws Exception { + // Less than one full group + int[] values = {1, 2, 3, 4, 5, 6, 7}; + verifyIntRoundTrip(values); + } + + // ========== INT64 Bit Width Coverage ========== + + @Test + public void testLongBitWidth0() throws Exception { + long[] values = new long[100]; + java.util.Arrays.fill(values, 123456789L); + verifyLongRoundTrip(values); + } + + @Test + public void testLongBitWidth1() throws Exception { + long[] values = new long[100]; + for (int i = 0; i < 100; i++) { + values[i] = 1_000_000L + (i % 2); + } + verifyLongRoundTrip(values); + } + + @Test + public void testLongBitWidth32() throws Exception { + long[] values = new long[1024]; + for (int i = 0; i < 1024; i++) { + values[i] = (long) i * 1_000_000L; + } + verifyLongRoundTrip(values); + } + + @Test + public void testLongBitWidth64() throws Exception { + long[] values = {Long.MIN_VALUE, Long.MAX_VALUE, 0L, -1L, 1L}; + verifyLongRoundTrip(values); + } + + @Test + public void testLongPartialGroup() throws Exception { + long[] values = new long[13]; + for (int i = 0; i < 13; i++) { + values[i] = i * 100_000L; + } + verifyLongRoundTrip(values); + } + + // ========== Page Header Verification ========== + + @Test + public void testIntPageHeaderFormat() throws Exception { + int[] values = new int[100]; + for (int i = 0; i < 100; i++) { + values[i] = i; + } + + PforValuesWriter.IntPforValuesWriter writer = new PforValuesWriter.IntPforValuesWriter( + 1024, 1024, new DirectByteBufferAllocator()); + for (int v : values) { + writer.writeInteger(v); + } + + ByteBuffer buf = writer.getBytes().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + + // Verify header + assertEquals("packing_mode", 0, buf.get(0) & 0xFF); + assertEquals("log_vector_size", 10, buf.get(1) & 0xFF); + assertEquals("value_byte_width", 4, buf.get(2) & 0xFF); + + buf.position(3); + assertEquals("num_elements", 100, buf.getInt()); + + writer.close(); + } + + @Test + public void testLongPageHeaderFormat() throws Exception { + long[] values = new long[100]; + for (int i = 0; i < 100; i++) { + values[i] = i; + } + + PforValuesWriter.LongPforValuesWriter writer = new PforValuesWriter.LongPforValuesWriter( + 1024, 1024, new DirectByteBufferAllocator()); + for (long v : values) { + writer.writeLong(v); + } + + ByteBuffer buf = writer.getBytes().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + + assertEquals("packing_mode", 0, buf.get(0) & 0xFF); + assertEquals("log_vector_size", 10, buf.get(1) & 0xFF); + assertEquals("value_byte_width", 8, buf.get(2) & 0xFF); + + buf.position(3); + assertEquals("num_elements", 100, buf.getInt()); + + writer.close(); + } + + // ========== Exception Handling ========== + + @Test + public void testIntManyExceptions() throws Exception { + // More than half are outliers — cost model should widen bit width + int[] values = new int[100]; + for (int i = 0; i < 100; i++) { + values[i] = (i % 2 == 0) ? i : 1_000_000 + i; + } + verifyIntRoundTrip(values); + } + + @Test + public void testIntAllExceptionsScenario() throws Exception { + // Very wide range but few values: might make everything exceptions or wide pack + int[] values = {0, Integer.MAX_VALUE, Integer.MIN_VALUE, 42, -42}; + verifyIntRoundTrip(values); + } + + @Test + public void testLongManyExceptions() throws Exception { + long[] values = new long[100]; + for (int i = 0; i < 100; i++) { + values[i] = (i % 2 == 0) ? i : Long.MAX_VALUE - i; + } + verifyLongRoundTrip(values); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforEncoderDecoderTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforEncoderDecoderTest.java new file mode 100644 index 0000000000..fa8ed98249 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforEncoderDecoderTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.pfor; + +import static org.junit.Assert.*; + +import org.junit.Test; + +/** + * Tests for PFOR cost model and bit width utilities. + */ +public class PforEncoderDecoderTest { + + // ========== Bit Width Tests ========== + + @Test + public void testBitWidthForInt() { + assertEquals(0, PforEncoderDecoder.bitWidthForInt(0)); + assertEquals(1, PforEncoderDecoder.bitWidthForInt(1)); + assertEquals(2, PforEncoderDecoder.bitWidthForInt(2)); + assertEquals(2, PforEncoderDecoder.bitWidthForInt(3)); + assertEquals(3, PforEncoderDecoder.bitWidthForInt(4)); + assertEquals(8, PforEncoderDecoder.bitWidthForInt(255)); + assertEquals(9, PforEncoderDecoder.bitWidthForInt(256)); + assertEquals(16, PforEncoderDecoder.bitWidthForInt(65535)); + assertEquals(31, PforEncoderDecoder.bitWidthForInt(Integer.MAX_VALUE)); + // Unsigned: -1 == 0xFFFFFFFF → 32 bits + assertEquals(32, PforEncoderDecoder.bitWidthForInt(-1)); + } + + @Test + public void testBitWidthForLong() { + assertEquals(0, PforEncoderDecoder.bitWidthForLong(0L)); + assertEquals(1, PforEncoderDecoder.bitWidthForLong(1L)); + assertEquals(2, PforEncoderDecoder.bitWidthForLong(2L)); + assertEquals(2, PforEncoderDecoder.bitWidthForLong(3L)); + assertEquals(3, PforEncoderDecoder.bitWidthForLong(4L)); + assertEquals(8, PforEncoderDecoder.bitWidthForLong(255L)); + assertEquals(9, PforEncoderDecoder.bitWidthForLong(256L)); + assertEquals(16, PforEncoderDecoder.bitWidthForLong(65535L)); + assertEquals(31, PforEncoderDecoder.bitWidthForLong((long) Integer.MAX_VALUE)); + assertEquals(63, PforEncoderDecoder.bitWidthForLong(Long.MAX_VALUE)); + // Unsigned: -1 == 0xFFFFFFFFFFFFFFFF → 64 bits + assertEquals(64, PforEncoderDecoder.bitWidthForLong(-1L)); + } + + // ========== Cost Model Tests: INT32 ========== + + @Test + public void testOptimalBitWidthAllIdentical() { + // All deltas are 0 → bit_width=0, no exceptions + int[] deltas = new int[1024]; + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForInt(deltas, 1024); + assertEquals(0, result.bitWidth); + assertEquals(0, result.numExceptions); + } + + @Test + public void testOptimalBitWidthNoOutliers() { + // Deltas 0..255 → all fit in 8 bits, no exceptions + int[] deltas = new int[256]; + for (int i = 0; i < 256; i++) { + deltas[i] = i; + } + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForInt(deltas, 256); + assertEquals(8, result.bitWidth); + assertEquals(0, result.numExceptions); + } + + @Test + public void testOptimalBitWidthSingleOutlier() { + // 1023 values fit in 8 bits (0..255), 1 outlier at 100000 + // Cost at bw=8: 1024*8 + 0 = 8192 + // Cost at bw=17: 1024*17 + 0 = 17408 + // Cost at bw=0: 1024*0 + 1024*48 = 49152 + // Single outlier should still pick bw=8: cost=1024*8 + 1*48 = 8240 + int[] deltas = new int[1024]; + for (int i = 0; i < 1023; i++) { + deltas[i] = i % 256; + } + deltas[1023] = 100000; + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForInt(deltas, 1024); + assertEquals(8, result.bitWidth); + assertEquals(1, result.numExceptions); + } + + @Test + public void testOptimalBitWidthManyOutliers() { + // All values need 32 bits → bit_width=32, no exceptions + int[] deltas = new int[100]; + for (int i = 0; i < 100; i++) { + deltas[i] = Integer.MAX_VALUE - i; + } + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForInt(deltas, 100); + assertEquals(31, result.bitWidth); + assertEquals(0, result.numExceptions); + } + + @Test + public void testOptimalBitWidthSingleElement() { + int[] deltas = {42}; + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForInt(deltas, 1); + assertEquals(0, result.numExceptions); + assertTrue(result.bitWidth >= 6); // 42 needs 6 bits + } + + @Test + public void testOptimalBitWidthAllZeros() { + int[] deltas = new int[512]; + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForInt(deltas, 512); + assertEquals(0, result.bitWidth); + assertEquals(0, result.numExceptions); + } + + // ========== Cost Model Tests: INT64 ========== + + @Test + public void testOptimalBitWidthLongAllIdentical() { + long[] deltas = new long[1024]; + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForLong(deltas, 1024); + assertEquals(0, result.bitWidth); + assertEquals(0, result.numExceptions); + } + + @Test + public void testOptimalBitWidthLongNoOutliers() { + long[] deltas = new long[256]; + for (int i = 0; i < 256; i++) { + deltas[i] = i; + } + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForLong(deltas, 256); + assertEquals(8, result.bitWidth); + assertEquals(0, result.numExceptions); + } + + @Test + public void testOptimalBitWidthLongSingleOutlier() { + long[] deltas = new long[1024]; + for (int i = 0; i < 1023; i++) { + deltas[i] = i % 256; + } + deltas[1023] = 10_000_000_000L; + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForLong(deltas, 1024); + assertEquals(8, result.bitWidth); + assertEquals(1, result.numExceptions); + } + + @Test + public void testOptimalBitWidthLongLargeValues() { + long[] deltas = new long[100]; + for (int i = 0; i < 100; i++) { + deltas[i] = Long.MAX_VALUE - i; + } + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForLong(deltas, 100); + assertEquals(63, result.bitWidth); + assertEquals(0, result.numExceptions); + } + + // ========== Cost Model Sanity Checks ========== + + @Test + public void testCostModelPrefersFewerExceptions() { + // With 50% outliers, the cost model should widen bit width rather than + // storing half the values as exceptions + int[] deltas = new int[100]; + for (int i = 0; i < 50; i++) { + deltas[i] = i; // 0..49 fit in 6 bits + } + for (int i = 50; i < 100; i++) { + deltas[i] = 1000 + i; // need ~10 bits + } + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForInt(deltas, 100); + // Should choose to pack everything (10-11 bits) rather than 50 exceptions + assertEquals(0, result.numExceptions); + } + + @Test + public void testCostModelNeverExceedsMaxBitWidth() { + int[] deltas = {-1}; // 0xFFFFFFFF, needs 32 bits unsigned + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForInt(deltas, 1); + assertTrue(result.bitWidth <= 32); + } + + @Test + public void testCostModelLongNeverExceedsMaxBitWidth() { + long[] deltas = {-1L}; // 0xFFFFFFFFFFFFFFFF, needs 64 bits unsigned + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForLong(deltas, 1); + assertTrue(result.bitWidth <= 64); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforValuesEndToEndTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforValuesEndToEndTest.java new file mode 100644 index 0000000000..d1c9aa80fd --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforValuesEndToEndTest.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.pfor; + +import static org.junit.Assert.*; + +import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.column.Encoding; +import org.junit.Test; + +/** + * End-to-end tests for PFOR encoding and decoding pipeline. + * Tests the full writer → serialized bytes → reader round-trip. + */ +public class PforValuesEndToEndTest { + + private static final int DEFAULT_VECTOR_SIZE = PforConstants.DEFAULT_VECTOR_SIZE; + + // ========== INT32 Helper ========== + + private void roundTripInt(int[] values) throws Exception { + roundTripInt(values, DEFAULT_VECTOR_SIZE); + } + + private void roundTripInt(int[] values, int vectorSize) throws Exception { + PforValuesWriter.IntPforValuesWriter writer = null; + try { + int capacity = Math.max(256, values.length * 8); + writer = new PforValuesWriter.IntPforValuesWriter( + capacity, capacity, new DirectByteBufferAllocator(), vectorSize); + + for (int v : values) { + writer.writeInteger(v); + } + + assertEquals(Encoding.PFOR, writer.getEncoding()); + + BytesInput input = writer.getBytes(); + PforValuesReaderForInt reader = new PforValuesReaderForInt(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + assertEquals("Value mismatch at index " + i, values[i], reader.readInteger()); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== INT64 Helper ========== + + private void roundTripLong(long[] values) throws Exception { + roundTripLong(values, DEFAULT_VECTOR_SIZE); + } + + private void roundTripLong(long[] values, int vectorSize) throws Exception { + PforValuesWriter.LongPforValuesWriter writer = null; + try { + int capacity = Math.max(512, values.length * 16); + writer = new PforValuesWriter.LongPforValuesWriter( + capacity, capacity, new DirectByteBufferAllocator(), vectorSize); + + for (long v : values) { + writer.writeLong(v); + } + + assertEquals(Encoding.PFOR, writer.getEncoding()); + + BytesInput input = writer.getBytes(); + PforValuesReaderForLong reader = new PforValuesReaderForLong(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + assertEquals("Value mismatch at index " + i, values[i], reader.readLong()); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== INT32 Tests ========== + + @Test + public void testIntSimpleSequence() throws Exception { + int[] values = new int[100]; + for (int i = 0; i < 100; i++) { + values[i] = i; + } + roundTripInt(values); + } + + @Test + public void testIntAllIdentical() throws Exception { + int[] values = new int[1024]; + java.util.Arrays.fill(values, 42); + roundTripInt(values); + } + + @Test + public void testIntAllZeros() throws Exception { + int[] values = new int[1024]; + roundTripInt(values); + } + + @Test + public void testIntSingleElement() throws Exception { + roundTripInt(new int[]{12345}); + } + + @Test + public void testIntNegativeValues() throws Exception { + int[] values = {-100, -50, -1, 0, 1, 50, 100}; + roundTripInt(values); + } + + @Test + public void testIntMinMaxValues() throws Exception { + int[] values = {Integer.MIN_VALUE, Integer.MAX_VALUE, 0, -1, 1}; + roundTripInt(values); + } + + @Test + public void testIntAlternatingMinMax() throws Exception { + int[] values = new int[100]; + for (int i = 0; i < 100; i++) { + values[i] = (i % 2 == 0) ? Integer.MIN_VALUE : Integer.MAX_VALUE; + } + roundTripInt(values); + } + + @Test + public void testIntExactOneVector() throws Exception { + int[] values = new int[1024]; + for (int i = 0; i < 1024; i++) { + values[i] = i * 3; + } + roundTripInt(values); + } + + @Test + public void testIntMultipleVectors() throws Exception { + int[] values = new int[3000]; + for (int i = 0; i < 3000; i++) { + values[i] = i * 7 - 10000; + } + roundTripInt(values); + } + + @Test + public void testIntPartialLastVector() throws Exception { + // 1025 values = 1 full vector + 1 partial + int[] values = new int[1025]; + for (int i = 0; i < 1025; i++) { + values[i] = i; + } + roundTripInt(values); + } + + @Test + public void testIntWithOutliers() throws Exception { + // Mostly small values with a few large outliers + int[] values = new int[1024]; + for (int i = 0; i < 1024; i++) { + values[i] = i % 100; + } + values[0] = 1_000_000; + values[512] = -1_000_000; + values[1023] = Integer.MAX_VALUE; + roundTripInt(values); + } + + @Test + public void testIntLargeRandom() throws Exception { + Random rng = new Random(42); + int[] values = new int[10000]; + for (int i = 0; i < 10000; i++) { + values[i] = rng.nextInt(); + } + roundTripInt(values); + } + + @Test + public void testIntSmallVectorSize() throws Exception { + int[] values = new int[100]; + for (int i = 0; i < 100; i++) { + values[i] = i * 11; + } + roundTripInt(values, 8); // smallest valid vector size + } + + @Test + public void testIntTpcdsLikeDateKeys() throws Exception { + // Simulate TPC-DS date dimension keys: mostly sequential with a few gaps + int[] values = new int[2048]; + for (int i = 0; i < 2048; i++) { + values[i] = 2450815 + i + (i % 100 == 0 ? 100 : 0); + } + roundTripInt(values); + } + + // ========== INT64 Tests ========== + + @Test + public void testLongSimpleSequence() throws Exception { + long[] values = new long[100]; + for (int i = 0; i < 100; i++) { + values[i] = i; + } + roundTripLong(values); + } + + @Test + public void testLongAllIdentical() throws Exception { + long[] values = new long[1024]; + java.util.Arrays.fill(values, 999999999999L); + roundTripLong(values); + } + + @Test + public void testLongAllZeros() throws Exception { + long[] values = new long[1024]; + roundTripLong(values); + } + + @Test + public void testLongSingleElement() throws Exception { + roundTripLong(new long[]{Long.MAX_VALUE}); + } + + @Test + public void testLongNegativeValues() throws Exception { + long[] values = {-100_000_000_000L, -1L, 0L, 1L, 100_000_000_000L}; + roundTripLong(values); + } + + @Test + public void testLongMinMaxValues() throws Exception { + long[] values = {Long.MIN_VALUE, Long.MAX_VALUE, 0L, -1L, 1L}; + roundTripLong(values); + } + + @Test + public void testLongMultipleVectors() throws Exception { + long[] values = new long[3000]; + for (int i = 0; i < 3000; i++) { + values[i] = (long) i * 1_000_000L - 1_500_000_000L; + } + roundTripLong(values); + } + + @Test + public void testLongPartialLastVector() throws Exception { + long[] values = new long[1025]; + for (int i = 0; i < 1025; i++) { + values[i] = i * 17L; + } + roundTripLong(values); + } + + @Test + public void testLongWithOutliers() throws Exception { + long[] values = new long[1024]; + for (int i = 0; i < 1024; i++) { + values[i] = i % 100; + } + values[0] = Long.MAX_VALUE; + values[512] = Long.MIN_VALUE; + values[1023] = 10_000_000_000_000L; + roundTripLong(values); + } + + @Test + public void testLongLargeRandom() throws Exception { + Random rng = new Random(42); + long[] values = new long[10000]; + for (int i = 0; i < 10000; i++) { + values[i] = rng.nextLong(); + } + roundTripLong(values); + } + + @Test + public void testLongSmallVectorSize() throws Exception { + long[] values = new long[100]; + for (int i = 0; i < 100; i++) { + values[i] = i * 1_000_000L; + } + roundTripLong(values, 8); + } + + // ========== Writer Reset/Reuse ========== + + @Test + public void testIntWriterReset() throws Exception { + PforValuesWriter.IntPforValuesWriter writer = new PforValuesWriter.IntPforValuesWriter( + 1024, 1024, new DirectByteBufferAllocator()); + + // First batch + for (int i = 0; i < 100; i++) { + writer.writeInteger(i); + } + BytesInput bytes1 = writer.getBytes(); + assertTrue(bytes1.size() > 0); + + // Reset and second batch + writer.reset(); + for (int i = 0; i < 50; i++) { + writer.writeInteger(i * 2); + } + BytesInput bytes2 = writer.getBytes(); + assertTrue(bytes2.size() > 0); + + // Verify second batch reads correctly + PforValuesReaderForInt reader = new PforValuesReaderForInt(); + reader.initFromPage(50, ByteBufferInputStream.wrap(bytes2.toByteBuffer())); + for (int i = 0; i < 50; i++) { + assertEquals(i * 2, reader.readInteger()); + } + + writer.close(); + } + + @Test + public void testLongWriterReset() throws Exception { + PforValuesWriter.LongPforValuesWriter writer = new PforValuesWriter.LongPforValuesWriter( + 1024, 1024, new DirectByteBufferAllocator()); + + for (int i = 0; i < 100; i++) { + writer.writeLong(i * 1000L); + } + writer.getBytes(); + + writer.reset(); + for (int i = 0; i < 50; i++) { + writer.writeLong(i * 2000L); + } + BytesInput bytes2 = writer.getBytes(); + + PforValuesReaderForLong reader = new PforValuesReaderForLong(); + reader.initFromPage(50, ByteBufferInputStream.wrap(bytes2.toByteBuffer())); + for (int i = 0; i < 50; i++) { + assertEquals(i * 2000L, reader.readLong()); + } + + writer.close(); + } + + // ========== Reader Skip Tests ========== + + @Test + public void testIntSkip() throws Exception { + int[] values = new int[2048]; + for (int i = 0; i < 2048; i++) { + values[i] = i; + } + + PforValuesWriter.IntPforValuesWriter writer = new PforValuesWriter.IntPforValuesWriter( + 4096, 4096, new DirectByteBufferAllocator()); + for (int v : values) { + writer.writeInteger(v); + } + BytesInput bytes = writer.getBytes(); + + PforValuesReaderForInt reader = new PforValuesReaderForInt(); + reader.initFromPage(2048, ByteBufferInputStream.wrap(bytes.toByteBuffer())); + + // Skip first 1000 + reader.skip(1000); + assertEquals(1000, reader.readInteger()); + assertEquals(1001, reader.readInteger()); + + // Skip more + reader.skip(500); + assertEquals(1502, reader.readInteger()); + + writer.close(); + } + + @Test + public void testLongSkip() throws Exception { + long[] values = new long[2048]; + for (int i = 0; i < 2048; i++) { + values[i] = i * 100L; + } + + PforValuesWriter.LongPforValuesWriter writer = new PforValuesWriter.LongPforValuesWriter( + 4096, 4096, new DirectByteBufferAllocator()); + for (long v : values) { + writer.writeLong(v); + } + BytesInput bytes = writer.getBytes(); + + PforValuesReaderForLong reader = new PforValuesReaderForLong(); + reader.initFromPage(2048, ByteBufferInputStream.wrap(bytes.toByteBuffer())); + + reader.skip(1000); + assertEquals(100000L, reader.readLong()); + + writer.close(); + } + + // ========== Empty Input ========== + + @Test + public void testIntEmptyInput() throws Exception { + PforValuesWriter.IntPforValuesWriter writer = new PforValuesWriter.IntPforValuesWriter( + 256, 256, new DirectByteBufferAllocator()); + BytesInput bytes = writer.getBytes(); + assertEquals(0, bytes.size()); + writer.close(); + } + + @Test + public void testLongEmptyInput() throws Exception { + PforValuesWriter.LongPforValuesWriter writer = new PforValuesWriter.LongPforValuesWriter( + 256, 256, new DirectByteBufferAllocator()); + BytesInput bytes = writer.getBytes(); + assertEquals(0, bytes.size()); + writer.close(); + } +} From 9ab7651921e5144eae5c04fd5089ad2b0375a99c Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Tue, 21 Apr 2026 00:40:12 +0000 Subject: [PATCH 4/8] Add PFOR encoding benchmark Benchmarks encode/decode throughput for int32/int64 across 8 data distributions inspired by Snowflake's NumericComprBenchmark: constant, sequential, small range, high-base-small-range (timestamps), with outliers (exception path), random, TPC-DS date keys, TPC-DS quantity. Uses junit-benchmarks (matches existing delta encoding benchmarks). Prints compression ratios for all distributions during setup. Excluded from normal test runs by surefire's benchmark exclusion. --- .../pfor/benchmark/BenchmarkPforEncoding.java | 540 ++++++++++++++++++ 1 file changed, 540 insertions(+) create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/pfor/benchmark/BenchmarkPforEncoding.java diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/benchmark/BenchmarkPforEncoding.java b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/benchmark/BenchmarkPforEncoding.java new file mode 100644 index 0000000000..31312ec914 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/benchmark/BenchmarkPforEncoding.java @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.pfor.benchmark; + +import com.carrotsearch.junitbenchmarks.BenchmarkOptions; +import com.carrotsearch.junitbenchmarks.BenchmarkRule; +import com.carrotsearch.junitbenchmarks.annotation.AxisRange; +import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart; +import java.io.IOException; +import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.column.values.pfor.PforValuesReaderForInt; +import org.apache.parquet.column.values.pfor.PforValuesReaderForLong; +import org.apache.parquet.column.values.pfor.PforValuesWriter; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * PFOR encoding/decoding benchmarks for INT32 and INT64. + * + *

Data distributions are inspired by Snowflake's NumericComprBenchmark.cpp, + * covering key archetypes that exercise PFOR's cost model differently: + *

+ * + *

Excluded from normal test runs via surefire's benchmark exclusion pattern. + */ +@AxisRange(min = 0, max = 1) +@BenchmarkMethodChart(filePrefix = "benchmark-pfor-encoding") +public class BenchmarkPforEncoding { + + private static final int NUM_VALUES = 500_000; + + @Rule + public org.junit.rules.TestRule benchmarkRun = new BenchmarkRule(); + + // ========== Pre-computed data ========== + private static int[] intConstant; + private static int[] intSequential; + private static int[] intSmallRange; + private static int[] intHighBaseSmallRange; + private static int[] intWithOutliers; + private static int[] intRandom; + private static int[] intTpcdsSoldDateSk; + private static int[] intTpcdsQuantity; + + private static long[] longConstant; + private static long[] longSequential; + private static long[] longSmallRange; + private static long[] longHighBaseSmallRange; + private static long[] longWithOutliers; + private static long[] longRandom; + private static long[] longTpcdsSoldDateSk; + + // Pre-encoded bytes for decode benchmarks + private static byte[] intConstantBytes; + private static byte[] intSequentialBytes; + private static byte[] intSmallRangeBytes; + private static byte[] intHighBaseSmallRangeBytes; + private static byte[] intWithOutliersBytes; + private static byte[] intRandomBytes; + private static byte[] intTpcdsSoldDateSkBytes; + private static byte[] intTpcdsQuantityBytes; + + private static byte[] longConstantBytes; + private static byte[] longSequentialBytes; + private static byte[] longSmallRangeBytes; + private static byte[] longHighBaseSmallRangeBytes; + private static byte[] longWithOutliersBytes; + private static byte[] longRandomBytes; + private static byte[] longTpcdsSoldDateSkBytes; + + @BeforeClass + public static void prepare() throws IOException { + // Generate INT32 distributions + intConstant = genIntConstant(NUM_VALUES); + intSequential = genIntSequential(NUM_VALUES); + intSmallRange = genIntSmallRange(NUM_VALUES); + intHighBaseSmallRange = genIntHighBaseSmallRange(NUM_VALUES); + intWithOutliers = genIntWithOutliers(NUM_VALUES); + intRandom = genIntRandom(NUM_VALUES); + intTpcdsSoldDateSk = genIntTpcdsSoldDateSk(NUM_VALUES); + intTpcdsQuantity = genIntTpcdsQuantity(NUM_VALUES); + + // Generate INT64 distributions + longConstant = genLongConstant(NUM_VALUES); + longSequential = genLongSequential(NUM_VALUES); + longSmallRange = genLongSmallRange(NUM_VALUES); + longHighBaseSmallRange = genLongHighBaseSmallRange(NUM_VALUES); + longWithOutliers = genLongWithOutliers(NUM_VALUES); + longRandom = genLongRandom(NUM_VALUES); + longTpcdsSoldDateSk = genLongTpcdsSoldDateSk(NUM_VALUES); + + // Pre-encode for decode benchmarks + intConstantBytes = encodeInts(intConstant); + intSequentialBytes = encodeInts(intSequential); + intSmallRangeBytes = encodeInts(intSmallRange); + intHighBaseSmallRangeBytes = encodeInts(intHighBaseSmallRange); + intWithOutliersBytes = encodeInts(intWithOutliers); + intRandomBytes = encodeInts(intRandom); + intTpcdsSoldDateSkBytes = encodeInts(intTpcdsSoldDateSk); + intTpcdsQuantityBytes = encodeInts(intTpcdsQuantity); + + longConstantBytes = encodeLongs(longConstant); + longSequentialBytes = encodeLongs(longSequential); + longSmallRangeBytes = encodeLongs(longSmallRange); + longHighBaseSmallRangeBytes = encodeLongs(longHighBaseSmallRange); + longWithOutliersBytes = encodeLongs(longWithOutliers); + longRandomBytes = encodeLongs(longRandom); + longTpcdsSoldDateSkBytes = encodeLongs(longTpcdsSoldDateSk); + + // Print compression ratios + System.out.println("=== PFOR Compression Ratios (compressed / uncompressed) ==="); + printIntRatio("Constant", intConstantBytes, intConstant.length); + printIntRatio("Sequential", intSequentialBytes, intSequential.length); + printIntRatio("SmallRange", intSmallRangeBytes, intSmallRange.length); + printIntRatio("HighBaseSmallRange", intHighBaseSmallRangeBytes, intHighBaseSmallRange.length); + printIntRatio("WithOutliers", intWithOutliersBytes, intWithOutliers.length); + printIntRatio("Random", intRandomBytes, intRandom.length); + printIntRatio("TpcdsSoldDateSk", intTpcdsSoldDateSkBytes, intTpcdsSoldDateSk.length); + printIntRatio("TpcdsQuantity", intTpcdsQuantityBytes, intTpcdsQuantity.length); + System.out.println("---"); + printLongRatio("Constant", longConstantBytes, longConstant.length); + printLongRatio("Sequential", longSequentialBytes, longSequential.length); + printLongRatio("SmallRange", longSmallRangeBytes, longSmallRange.length); + printLongRatio("HighBaseSmallRange", longHighBaseSmallRangeBytes, longHighBaseSmallRange.length); + printLongRatio("WithOutliers", longWithOutliersBytes, longWithOutliers.length); + printLongRatio("Random", longRandomBytes, longRandom.length); + printLongRatio("TpcdsSoldDateSk", longTpcdsSoldDateSkBytes, longTpcdsSoldDateSk.length); + } + + // ========== INT32 Encode Benchmarks ========== + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeIntConstant() throws IOException { + benchmarkIntEncode(intConstant); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeIntSequential() throws IOException { + benchmarkIntEncode(intSequential); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeIntSmallRange() throws IOException { + benchmarkIntEncode(intSmallRange); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeIntHighBaseSmallRange() throws IOException { + benchmarkIntEncode(intHighBaseSmallRange); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeIntWithOutliers() throws IOException { + benchmarkIntEncode(intWithOutliers); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeIntRandom() throws IOException { + benchmarkIntEncode(intRandom); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeIntTpcdsSoldDateSk() throws IOException { + benchmarkIntEncode(intTpcdsSoldDateSk); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeIntTpcdsQuantity() throws IOException { + benchmarkIntEncode(intTpcdsQuantity); + } + + // ========== INT32 Decode Benchmarks ========== + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeIntConstant() throws IOException { + benchmarkIntDecode(intConstantBytes, intConstant.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeIntSequential() throws IOException { + benchmarkIntDecode(intSequentialBytes, intSequential.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeIntSmallRange() throws IOException { + benchmarkIntDecode(intSmallRangeBytes, intSmallRange.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeIntHighBaseSmallRange() throws IOException { + benchmarkIntDecode(intHighBaseSmallRangeBytes, intHighBaseSmallRange.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeIntWithOutliers() throws IOException { + benchmarkIntDecode(intWithOutliersBytes, intWithOutliers.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeIntRandom() throws IOException { + benchmarkIntDecode(intRandomBytes, intRandom.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeIntTpcdsSoldDateSk() throws IOException { + benchmarkIntDecode(intTpcdsSoldDateSkBytes, intTpcdsSoldDateSk.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeIntTpcdsQuantity() throws IOException { + benchmarkIntDecode(intTpcdsQuantityBytes, intTpcdsQuantity.length); + } + + // ========== INT64 Encode Benchmarks ========== + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeLongConstant() throws IOException { + benchmarkLongEncode(longConstant); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeLongSequential() throws IOException { + benchmarkLongEncode(longSequential); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeLongSmallRange() throws IOException { + benchmarkLongEncode(longSmallRange); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeLongHighBaseSmallRange() throws IOException { + benchmarkLongEncode(longHighBaseSmallRange); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeLongWithOutliers() throws IOException { + benchmarkLongEncode(longWithOutliers); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeLongRandom() throws IOException { + benchmarkLongEncode(longRandom); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void encodeLongTpcdsSoldDateSk() throws IOException { + benchmarkLongEncode(longTpcdsSoldDateSk); + } + + // ========== INT64 Decode Benchmarks ========== + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeLongConstant() throws IOException { + benchmarkLongDecode(longConstantBytes, longConstant.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeLongSequential() throws IOException { + benchmarkLongDecode(longSequentialBytes, longSequential.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeLongSmallRange() throws IOException { + benchmarkLongDecode(longSmallRangeBytes, longSmallRange.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeLongHighBaseSmallRange() throws IOException { + benchmarkLongDecode(longHighBaseSmallRangeBytes, longHighBaseSmallRange.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeLongWithOutliers() throws IOException { + benchmarkLongDecode(longWithOutliersBytes, longWithOutliers.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeLongRandom() throws IOException { + benchmarkLongDecode(longRandomBytes, longRandom.length); + } + + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) + @Test + public void decodeLongTpcdsSoldDateSk() throws IOException { + benchmarkLongDecode(longTpcdsSoldDateSkBytes, longTpcdsSoldDateSk.length); + } + + // ========== Benchmark Helpers ========== + + private void benchmarkIntEncode(int[] values) throws IOException { + int capacity = Math.max(256, values.length * 8); + PforValuesWriter.IntPforValuesWriter writer = + new PforValuesWriter.IntPforValuesWriter(capacity, capacity, new DirectByteBufferAllocator()); + for (int v : values) { + writer.writeInteger(v); + } + writer.getBytes(); + writer.close(); + } + + private void benchmarkIntDecode(byte[] encoded, int numValues) throws IOException { + PforValuesReaderForInt reader = new PforValuesReaderForInt(); + reader.initFromPage(numValues, + ByteBufferInputStream.wrap(java.nio.ByteBuffer.wrap(encoded))); + for (int i = 0; i < numValues; i++) { + reader.readInteger(); + } + } + + private void benchmarkLongEncode(long[] values) throws IOException { + int capacity = Math.max(512, values.length * 16); + PforValuesWriter.LongPforValuesWriter writer = + new PforValuesWriter.LongPforValuesWriter(capacity, capacity, new DirectByteBufferAllocator()); + for (long v : values) { + writer.writeLong(v); + } + writer.getBytes(); + writer.close(); + } + + private void benchmarkLongDecode(byte[] encoded, int numValues) throws IOException { + PforValuesReaderForLong reader = new PforValuesReaderForLong(); + reader.initFromPage(numValues, + ByteBufferInputStream.wrap(java.nio.ByteBuffer.wrap(encoded))); + for (int i = 0; i < numValues; i++) { + reader.readLong(); + } + } + + // ========== Data Generators ========== + + private static int[] genIntConstant(int n) { + int[] v = new int[n]; + java.util.Arrays.fill(v, 42); + return v; + } + + private static int[] genIntSequential(int n) { + int[] v = new int[n]; + for (int i = 0; i < n; i++) v[i] = i; + return v; + } + + private static int[] genIntSmallRange(int n) { + int[] v = new int[n]; + Random rng = new Random(12345); + for (int i = 0; i < n; i++) v[i] = 100000 + rng.nextInt(100001); + return v; + } + + private static int[] genIntHighBaseSmallRange(int n) { + int[] v = new int[n]; + Random rng = new Random(12345); + for (int i = 0; i < n; i++) v[i] = 1704067200 + rng.nextInt(1001); + return v; + } + + private static int[] genIntWithOutliers(int n) { + int[] v = new int[n]; + Random rng = new Random(42); + for (int i = 0; i < n; i++) v[i] = 1000 + rng.nextInt(256); + // ~1% outliers + int numOutliers = Math.max(1, n / 100); + for (int i = 0; i < numOutliers; i++) { + v[rng.nextInt(n)] = Integer.MAX_VALUE / 2 + i; + } + return v; + } + + private static int[] genIntRandom(int n) { + int[] v = new int[n]; + Random rng = new Random(99); + for (int i = 0; i < n; i++) v[i] = rng.nextInt(); + return v; + } + + private static int[] genIntTpcdsSoldDateSk(int n) { + int[] v = new int[n]; + Random rng = new Random(12345); + for (int i = 0; i < n; i++) v[i] = 2450815 + rng.nextInt(1821); + return v; + } + + private static int[] genIntTpcdsQuantity(int n) { + int[] v = new int[n]; + Random rng = new Random(12345); + for (int i = 0; i < n; i++) { + v[i] = (rng.nextInt(100) < 90) ? (1 + rng.nextInt(10)) : (11 + rng.nextInt(90)); + } + return v; + } + + private static long[] genLongConstant(int n) { + long[] v = new long[n]; + java.util.Arrays.fill(v, 42L); + return v; + } + + private static long[] genLongSequential(int n) { + long[] v = new long[n]; + for (int i = 0; i < n; i++) v[i] = i; + return v; + } + + private static long[] genLongSmallRange(int n) { + long[] v = new long[n]; + Random rng = new Random(12345); + for (int i = 0; i < n; i++) v[i] = 100000L + rng.nextInt(100001); + return v; + } + + private static long[] genLongHighBaseSmallRange(int n) { + long[] v = new long[n]; + Random rng = new Random(12345); + for (int i = 0; i < n; i++) v[i] = 1704067200L + rng.nextInt(1001); + return v; + } + + private static long[] genLongWithOutliers(int n) { + long[] v = new long[n]; + Random rng = new Random(42); + for (int i = 0; i < n; i++) v[i] = 1000L + rng.nextInt(256); + int numOutliers = Math.max(1, n / 100); + for (int i = 0; i < numOutliers; i++) { + v[rng.nextInt(n)] = Long.MAX_VALUE / 2 + i; + } + return v; + } + + private static long[] genLongRandom(int n) { + long[] v = new long[n]; + Random rng = new Random(99); + for (int i = 0; i < n; i++) v[i] = rng.nextLong(); + return v; + } + + private static long[] genLongTpcdsSoldDateSk(int n) { + long[] v = new long[n]; + Random rng = new Random(12345); + for (int i = 0; i < n; i++) v[i] = 2450815L + rng.nextInt(1821); + return v; + } + + // ========== Encoding Helpers ========== + + private static byte[] encodeInts(int[] values) throws IOException { + int capacity = Math.max(256, values.length * 8); + PforValuesWriter.IntPforValuesWriter writer = + new PforValuesWriter.IntPforValuesWriter(capacity, capacity, new DirectByteBufferAllocator()); + for (int v : values) { + writer.writeInteger(v); + } + byte[] result = writer.getBytes().toByteArray(); + writer.close(); + return result; + } + + private static byte[] encodeLongs(long[] values) throws IOException { + int capacity = Math.max(512, values.length * 16); + PforValuesWriter.LongPforValuesWriter writer = + new PforValuesWriter.LongPforValuesWriter(capacity, capacity, new DirectByteBufferAllocator()); + for (long v : values) { + writer.writeLong(v); + } + byte[] result = writer.getBytes().toByteArray(); + writer.close(); + return result; + } + + private static void printIntRatio(String name, byte[] encoded, int numValues) { + double ratio = 100.0 * encoded.length / (numValues * 4); + System.out.printf(" INT32 %-25s: %6d bytes -> %6d bytes (%.1f%%)\n", + name, numValues * 4, encoded.length, ratio); + } + + private static void printLongRatio(String name, byte[] encoded, int numValues) { + double ratio = 100.0 * encoded.length / (numValues * 8); + System.out.printf(" INT64 %-25s: %6d bytes -> %6d bytes (%.1f%%)\n", + name, numValues * 8, encoded.length, ratio); + } +} From c10fa78dbd8ab548f094d935edde6713ae49ccd8 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 18:57:20 +0000 Subject: [PATCH 5/8] Reduce per-vector allocations and harden reader validation Writer: - Pre-allocate reusable buffers (deltasBuffer, excPosBuffer, excValBuffer, metadataBuf, packBuf, packPadBuf) in constructor instead of allocating new arrays on every encodeAndFlushVector call - Replace ByteBuffer.allocate().order(LITTLE_ENDIAN) with manual byte shifts into reusable metadataBuf for vector info and exception writes - Emit valid header for totalCount==0 (reader can distinguish empty page from missing encoding) instead of BytesInput.empty() Reader: - Add numElements > valuesCount validation (handles nullable columns where page row count > encoded values) - Move getShortLE/getIntLE/getLongLE from private static in concrete readers to protected static in PforValuesReader base class --- .../column/values/pfor/PforValuesReader.java | 26 +++ .../values/pfor/PforValuesReaderForInt.java | 10 - .../values/pfor/PforValuesReaderForLong.java | 14 -- .../column/values/pfor/PforValuesWriter.java | 184 +++++++++++------- 4 files changed, 137 insertions(+), 97 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReader.java index 25275b8179..8f891235ad 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReader.java @@ -83,6 +83,10 @@ public void initFromPage(int valuesCount, ByteBufferInputStream stream) if (numElements < 0) { throw new ParquetDecodingException("Invalid PFOR element count: " + numElements); } + if (numElements > valuesCount) { + throw new ParquetDecodingException( + "PFOR header element count " + numElements + " exceeds page valuesCount " + valuesCount); + } this.vectorSize = 1 << logVectorSize; this.totalCount = numElements; @@ -148,4 +152,26 @@ protected void ensureVectorDecoded() { protected abstract void allocateDecodedBuffer(int capacity); protected abstract void decodeVector(int vectorIdx); + + protected static int getShortLE(ByteBuffer buf, int pos) { + return (buf.get(pos) & 0xFF) | ((buf.get(pos + 1) & 0xFF) << 8); + } + + protected static int getIntLE(ByteBuffer buf, int pos) { + return (buf.get(pos) & 0xFF) + | ((buf.get(pos + 1) & 0xFF) << 8) + | ((buf.get(pos + 2) & 0xFF) << 16) + | ((buf.get(pos + 3) & 0xFF) << 24); + } + + protected static long getLongLE(ByteBuffer buf, int pos) { + return (buf.get(pos) & 0xFFL) + | ((buf.get(pos + 1) & 0xFFL) << 8) + | ((buf.get(pos + 2) & 0xFFL) << 16) + | ((buf.get(pos + 3) & 0xFFL) << 24) + | ((buf.get(pos + 4) & 0xFFL) << 32) + | ((buf.get(pos + 5) & 0xFFL) << 40) + | ((buf.get(pos + 6) & 0xFFL) << 48) + | ((buf.get(pos + 7) & 0xFFL) << 56); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java index 420a188098..b5d9fec57e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java @@ -129,14 +129,4 @@ private int unpackIntsWithBytePacker(ByteBuffer buf, int pos, int[] output, int return pos; } - private static int getShortLE(ByteBuffer buf, int pos) { - return (buf.get(pos) & 0xFF) | ((buf.get(pos + 1) & 0xFF) << 8); - } - - private static int getIntLE(ByteBuffer buf, int pos) { - return (buf.get(pos) & 0xFF) - | ((buf.get(pos + 1) & 0xFF) << 8) - | ((buf.get(pos + 2) & 0xFF) << 16) - | ((buf.get(pos + 3) & 0xFF) << 24); - } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java index 91ac244369..3afbf0943b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java @@ -130,18 +130,4 @@ private int unpackLongsWithBytePacker(ByteBuffer buf, int pos, long[] output, in return pos; } - private static int getShortLE(ByteBuffer buf, int pos) { - return (buf.get(pos) & 0xFF) | ((buf.get(pos + 1) & 0xFF) << 8); - } - - private static long getLongLE(ByteBuffer buf, int pos) { - return (buf.get(pos) & 0xFFL) - | ((buf.get(pos + 1) & 0xFFL) << 8) - | ((buf.get(pos + 2) & 0xFFL) << 16) - | ((buf.get(pos + 3) & 0xFFL) << 24) - | ((buf.get(pos + 4) & 0xFFL) << 32) - | ((buf.get(pos + 5) & 0xFFL) << 40) - | ((buf.get(pos + 6) & 0xFFL) << 48) - | ((buf.get(pos + 7) & 0xFFL) << 56); - } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesWriter.java index ab39301dad..8cc5bd5e6f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesWriter.java @@ -86,6 +86,14 @@ public static class IntPforValuesWriter extends PforValuesWriter { private CapacityByteArrayOutputStream encodedVectors; private final List vectorByteSizes; + // Reusable per-vector buffers to avoid allocations on every encodeAndFlushVector call + private final int[] deltasBuffer; + private final short[] excPosBuffer; + private final int[] excValBuffer; + private final byte[] metadataBuf; + private final byte[] packBuf; + private final int[] packPadBuf; + public IntPforValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) { this(initialCapacity, pageSize, allocator, DEFAULT_VECTOR_SIZE); } @@ -97,6 +105,12 @@ public IntPforValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocato this.totalCount = 0; this.encodedVectors = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); this.vectorByteSizes = new ArrayList<>(); + this.deltasBuffer = new int[vectorSize]; + this.excPosBuffer = new short[vectorSize]; + this.excValBuffer = new int[vectorSize]; + this.metadataBuf = new byte[INT32_VECTOR_INFO_SIZE]; + this.packBuf = new byte[Integer.SIZE]; // max bit width for int = 32 bytes + this.packPadBuf = new int[8]; } @Override @@ -118,30 +132,26 @@ private void encodeAndFlushVector(int vectorLen) { } } - // Compute unsigned deltas - int[] deltas = new int[vectorLen]; + // Compute unsigned deltas into reusable buffer for (int i = 0; i < vectorLen; i++) { - deltas[i] = vectorBuffer[i] - minValue; + deltasBuffer[i] = vectorBuffer[i] - minValue; } // Find optimal bit width via cost model - PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForInt(deltas, vectorLen); + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForInt(deltasBuffer, vectorLen); int bitWidth = result.bitWidth; int numExceptions = result.numExceptions; // Collect exceptions: values whose delta doesn't fit in bitWidth bits - short[] excPositions = new short[numExceptions]; - int[] excValues = new int[numExceptions]; int excIdx = 0; - if (numExceptions > 0) { int mask = (bitWidth == 32) ? -1 : (1 << bitWidth) - 1; for (int i = 0; i < vectorLen; i++) { - if (Integer.compareUnsigned(deltas[i], mask) > 0) { - excPositions[excIdx] = (short) i; - excValues[excIdx] = vectorBuffer[i]; // original value, not delta + if (Integer.compareUnsigned(deltasBuffer[i], mask) > 0) { + excPosBuffer[excIdx] = (short) i; + excValBuffer[excIdx] = vectorBuffer[i]; excIdx++; - deltas[i] = 0; // placeholder in packed data + deltasBuffer[i] = 0; } } } @@ -149,32 +159,37 @@ private void encodeAndFlushVector(int vectorLen) { long startSize = encodedVectors.size(); // PforVectorInfo: frame_of_reference(4) + bit_width(1) + num_exceptions(2) = 7B - ByteBuffer vectorInfo = ByteBuffer.allocate(INT32_VECTOR_INFO_SIZE).order(ByteOrder.LITTLE_ENDIAN); - vectorInfo.putInt(minValue); - vectorInfo.put((byte) bitWidth); - vectorInfo.putShort((short) numExceptions); - encodedVectors.write(vectorInfo.array(), 0, INT32_VECTOR_INFO_SIZE); + metadataBuf[0] = (byte) (minValue & 0xFF); + metadataBuf[1] = (byte) ((minValue >>> 8) & 0xFF); + metadataBuf[2] = (byte) ((minValue >>> 16) & 0xFF); + metadataBuf[3] = (byte) ((minValue >>> 24) & 0xFF); + metadataBuf[4] = (byte) bitWidth; + metadataBuf[5] = (byte) (numExceptions & 0xFF); + metadataBuf[6] = (byte) ((numExceptions >>> 8) & 0xFF); + encodedVectors.write(metadataBuf, 0, INT32_VECTOR_INFO_SIZE); // Pack deltas if (bitWidth > 0) { - packIntsWithBytePacker(deltas, vectorLen, bitWidth); + packIntsWithBytePacker(deltasBuffer, vectorLen, bitWidth); } // Exception positions then values if (numExceptions > 0) { - ByteBuffer excPosBuf = - ByteBuffer.allocate(numExceptions * Short.BYTES).order(ByteOrder.LITTLE_ENDIAN); for (int i = 0; i < numExceptions; i++) { - excPosBuf.putShort(excPositions[i]); + int pos = excPosBuffer[i] & 0xFFFF; + metadataBuf[0] = (byte) (pos & 0xFF); + metadataBuf[1] = (byte) ((pos >>> 8) & 0xFF); + encodedVectors.write(metadataBuf, 0, Short.BYTES); } - encodedVectors.write(excPosBuf.array(), 0, numExceptions * Short.BYTES); - ByteBuffer excValBuf = - ByteBuffer.allocate(numExceptions * Integer.BYTES).order(ByteOrder.LITTLE_ENDIAN); for (int i = 0; i < numExceptions; i++) { - excValBuf.putInt(excValues[i]); + int val = excValBuffer[i]; + metadataBuf[0] = (byte) (val & 0xFF); + metadataBuf[1] = (byte) ((val >>> 8) & 0xFF); + metadataBuf[2] = (byte) ((val >>> 16) & 0xFF); + metadataBuf[3] = (byte) ((val >>> 24) & 0xFF); + encodedVectors.write(metadataBuf, 0, Integer.BYTES); } - encodedVectors.write(excValBuf.array(), 0, numExceptions * Integer.BYTES); } vectorByteSizes.add((int) (encodedVectors.size() - startSize)); @@ -184,22 +199,21 @@ private void packIntsWithBytePacker(int[] values, int count, int bitWidth) { BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); int numFullGroups = count / 8; int remaining = count % 8; - byte[] packed = new byte[bitWidth]; for (int g = 0; g < numFullGroups; g++) { - packer.pack8Values(values, g * 8, packed, 0); - encodedVectors.write(packed, 0, bitWidth); + packer.pack8Values(values, g * 8, packBuf, 0); + encodedVectors.write(packBuf, 0, bitWidth); } - // Partial last group: pack 8 values (zero-padded), but only write - // ceil(count * bitWidth / 8) - alreadyWritten bytes per spec. if (remaining > 0) { - int[] padded = new int[8]; - System.arraycopy(values, numFullGroups * 8, padded, 0, remaining); - packer.pack8Values(padded, 0, packed, 0); + System.arraycopy(values, numFullGroups * 8, packPadBuf, 0, remaining); + for (int i = remaining; i < 8; i++) { + packPadBuf[i] = 0; + } + packer.pack8Values(packPadBuf, 0, packBuf, 0); int totalPackedBytes = (count * bitWidth + 7) / 8; int alreadyWritten = numFullGroups * bitWidth; - encodedVectors.write(packed, 0, totalPackedBytes - alreadyWritten); + encodedVectors.write(packBuf, 0, totalPackedBytes - alreadyWritten); } } @@ -210,10 +224,6 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { - if (totalCount == 0) { - return BytesInput.empty(); - } - if (bufferCount > 0) { encodeAndFlushVector(bufferCount); bufferCount = 0; @@ -228,6 +238,10 @@ public BytesInput getBytes() { header.put((byte) INT32_VALUE_BYTE_WIDTH); header.putInt(totalCount); + if (totalCount == 0) { + return BytesInput.from(header.array()); + } + int offsetArraySize = numVectors * Integer.BYTES; ByteBuffer offsets = ByteBuffer.allocate(offsetArraySize).order(ByteOrder.LITTLE_ENDIAN); int currentOffset = offsetArraySize; @@ -273,6 +287,14 @@ public static class LongPforValuesWriter extends PforValuesWriter { private CapacityByteArrayOutputStream encodedVectors; private final List vectorByteSizes; + // Reusable per-vector buffers + private final long[] deltasBuffer; + private final short[] excPosBuffer; + private final long[] excValBuffer; + private final byte[] metadataBuf; + private final byte[] packBuf; + private final long[] packPadBuf; + public LongPforValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) { this(initialCapacity, pageSize, allocator, DEFAULT_VECTOR_SIZE); } @@ -284,6 +306,12 @@ public LongPforValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocat this.totalCount = 0; this.encodedVectors = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); this.vectorByteSizes = new ArrayList<>(); + this.deltasBuffer = new long[vectorSize]; + this.excPosBuffer = new short[vectorSize]; + this.excValBuffer = new long[vectorSize]; + this.metadataBuf = new byte[INT64_VECTOR_INFO_SIZE]; + this.packBuf = new byte[Long.SIZE]; // max bit width for long = 64 bytes + this.packPadBuf = new long[8]; } @Override @@ -304,27 +332,23 @@ private void encodeAndFlushVector(int vectorLen) { } } - long[] deltas = new long[vectorLen]; for (int i = 0; i < vectorLen; i++) { - deltas[i] = vectorBuffer[i] - minValue; + deltasBuffer[i] = vectorBuffer[i] - minValue; } - PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForLong(deltas, vectorLen); + PforEncoderDecoder.BitWidthResult result = PforEncoderDecoder.findOptimalBitWidthForLong(deltasBuffer, vectorLen); int bitWidth = result.bitWidth; int numExceptions = result.numExceptions; - short[] excPositions = new short[numExceptions]; - long[] excValues = new long[numExceptions]; int excIdx = 0; - if (numExceptions > 0) { long mask = (bitWidth == 64) ? -1L : (1L << bitWidth) - 1L; for (int i = 0; i < vectorLen; i++) { - if (Long.compareUnsigned(deltas[i], mask) > 0) { - excPositions[excIdx] = (short) i; - excValues[excIdx] = vectorBuffer[i]; // original value + if (Long.compareUnsigned(deltasBuffer[i], mask) > 0) { + excPosBuffer[excIdx] = (short) i; + excValBuffer[excIdx] = vectorBuffer[i]; excIdx++; - deltas[i] = 0; // placeholder + deltasBuffer[i] = 0; } } } @@ -332,30 +356,43 @@ private void encodeAndFlushVector(int vectorLen) { long startSize = encodedVectors.size(); // PforVectorInfo: frame_of_reference(8) + bit_width(1) + num_exceptions(2) = 11B - ByteBuffer vectorInfo = ByteBuffer.allocate(INT64_VECTOR_INFO_SIZE).order(ByteOrder.LITTLE_ENDIAN); - vectorInfo.putLong(minValue); - vectorInfo.put((byte) bitWidth); - vectorInfo.putShort((short) numExceptions); - encodedVectors.write(vectorInfo.array(), 0, INT64_VECTOR_INFO_SIZE); + metadataBuf[0] = (byte) (minValue & 0xFF); + metadataBuf[1] = (byte) ((minValue >>> 8) & 0xFF); + metadataBuf[2] = (byte) ((minValue >>> 16) & 0xFF); + metadataBuf[3] = (byte) ((minValue >>> 24) & 0xFF); + metadataBuf[4] = (byte) ((minValue >>> 32) & 0xFF); + metadataBuf[5] = (byte) ((minValue >>> 40) & 0xFF); + metadataBuf[6] = (byte) ((minValue >>> 48) & 0xFF); + metadataBuf[7] = (byte) ((minValue >>> 56) & 0xFF); + metadataBuf[8] = (byte) bitWidth; + metadataBuf[9] = (byte) (numExceptions & 0xFF); + metadataBuf[10] = (byte) ((numExceptions >>> 8) & 0xFF); + encodedVectors.write(metadataBuf, 0, INT64_VECTOR_INFO_SIZE); if (bitWidth > 0) { - packLongsWithBytePacker(deltas, vectorLen, bitWidth); + packLongsWithBytePacker(deltasBuffer, vectorLen, bitWidth); } if (numExceptions > 0) { - ByteBuffer excPosBuf = - ByteBuffer.allocate(numExceptions * Short.BYTES).order(ByteOrder.LITTLE_ENDIAN); for (int i = 0; i < numExceptions; i++) { - excPosBuf.putShort(excPositions[i]); + int pos = excPosBuffer[i] & 0xFFFF; + metadataBuf[0] = (byte) (pos & 0xFF); + metadataBuf[1] = (byte) ((pos >>> 8) & 0xFF); + encodedVectors.write(metadataBuf, 0, Short.BYTES); } - encodedVectors.write(excPosBuf.array(), 0, numExceptions * Short.BYTES); - ByteBuffer excValBuf = - ByteBuffer.allocate(numExceptions * Long.BYTES).order(ByteOrder.LITTLE_ENDIAN); for (int i = 0; i < numExceptions; i++) { - excValBuf.putLong(excValues[i]); + long val = excValBuffer[i]; + metadataBuf[0] = (byte) (val & 0xFF); + metadataBuf[1] = (byte) ((val >>> 8) & 0xFF); + metadataBuf[2] = (byte) ((val >>> 16) & 0xFF); + metadataBuf[3] = (byte) ((val >>> 24) & 0xFF); + metadataBuf[4] = (byte) ((val >>> 32) & 0xFF); + metadataBuf[5] = (byte) ((val >>> 40) & 0xFF); + metadataBuf[6] = (byte) ((val >>> 48) & 0xFF); + metadataBuf[7] = (byte) ((val >>> 56) & 0xFF); + encodedVectors.write(metadataBuf, 0, Long.BYTES); } - encodedVectors.write(excValBuf.array(), 0, numExceptions * Long.BYTES); } vectorByteSizes.add((int) (encodedVectors.size() - startSize)); @@ -365,20 +402,21 @@ private void packLongsWithBytePacker(long[] values, int count, int bitWidth) { BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidth); int numFullGroups = count / 8; int remaining = count % 8; - byte[] packed = new byte[bitWidth]; for (int g = 0; g < numFullGroups; g++) { - packer.pack8Values(values, g * 8, packed, 0); - encodedVectors.write(packed, 0, bitWidth); + packer.pack8Values(values, g * 8, packBuf, 0); + encodedVectors.write(packBuf, 0, bitWidth); } if (remaining > 0) { - long[] padded = new long[8]; - System.arraycopy(values, numFullGroups * 8, padded, 0, remaining); - packer.pack8Values(padded, 0, packed, 0); + System.arraycopy(values, numFullGroups * 8, packPadBuf, 0, remaining); + for (int i = remaining; i < 8; i++) { + packPadBuf[i] = 0; + } + packer.pack8Values(packPadBuf, 0, packBuf, 0); int totalPackedBytes = (count * bitWidth + 7) / 8; int alreadyWritten = numFullGroups * bitWidth; - encodedVectors.write(packed, 0, totalPackedBytes - alreadyWritten); + encodedVectors.write(packBuf, 0, totalPackedBytes - alreadyWritten); } } @@ -389,10 +427,6 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { - if (totalCount == 0) { - return BytesInput.empty(); - } - if (bufferCount > 0) { encodeAndFlushVector(bufferCount); bufferCount = 0; @@ -406,6 +440,10 @@ public BytesInput getBytes() { header.put((byte) INT64_VALUE_BYTE_WIDTH); header.putInt(totalCount); + if (totalCount == 0) { + return BytesInput.from(header.array()); + } + int offsetArraySize = numVectors * Integer.BYTES; ByteBuffer offsets = ByteBuffer.allocate(offsetArraySize).order(ByteOrder.LITTLE_ENDIAN); int currentOffset = offsetArraySize; From 1aa3aa4f5ec3f21079e555acc9542932e3b9b289 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 18:58:37 +0000 Subject: [PATCH 6/8] Add adversarial tests for PFOR reader validation Tests cover: - Bad packing mode, log vector size out of range, bad value byte width - Negative num_elements, numElements > valuesCount - Header-only page, truncated offset array, truncated vector data - Corrupted offset pointing past buffer end - Skip past end, negative skip, read past end - Skip across vector boundaries (correctness check) --- .../values/pfor/PforAdversarialTest.java | 336 ++++++++++++++++++ 1 file changed, 336 insertions(+) create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforAdversarialTest.java diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforAdversarialTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforAdversarialTest.java new file mode 100644 index 0000000000..15d3cc499e --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforAdversarialTest.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.pfor; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.io.ParquetDecodingException; +import org.junit.Test; + +/** + * Adversarial tests for PFOR readers: feed malformed page bytes and assert the reader + * fails cleanly rather than crashing, producing silent garbage, or hanging. + * + *

Covers both explicitly-validated cases (ParquetDecodingException with message) + * and currently-unvalidated cases (IndexOutOfBoundsException or BufferUnderflowException + * from the underlying ByteBuffer). + */ +public class PforAdversarialTest { + + private static final int VECTOR_SIZE = PforConstants.DEFAULT_VECTOR_SIZE; + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + private static byte[] validIntPage(int valueCount, int vectorSize) throws Exception { + PforValuesWriter.IntPforValuesWriter writer = null; + try { + int cap = Math.max(512, valueCount * 8); + writer = new PforValuesWriter.IntPforValuesWriter( + cap, cap, new DirectByteBufferAllocator(), vectorSize); + for (int i = 0; i < valueCount; i++) { + writer.writeInteger(i * 7 + 3); + } + BytesInput bi = writer.getBytes(); + ByteBuffer bb = bi.toByteBuffer(); + byte[] out = new byte[bb.remaining()]; + bb.duplicate().get(out); + return out; + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + private static byte[] validLongPage(int valueCount, int vectorSize) throws Exception { + PforValuesWriter.LongPforValuesWriter writer = null; + try { + int cap = Math.max(512, valueCount * 16); + writer = new PforValuesWriter.LongPforValuesWriter( + cap, cap, new DirectByteBufferAllocator(), vectorSize); + for (int i = 0; i < valueCount; i++) { + writer.writeLong((long) i * 13 + 5); + } + BytesInput bi = writer.getBytes(); + ByteBuffer bb = bi.toByteBuffer(); + byte[] out = new byte[bb.remaining()]; + bb.duplicate().get(out); + return out; + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + private static byte[] mutate(byte[] original, int offset, byte value) { + byte[] copy = original.clone(); + copy[offset] = value; + return copy; + } + + private static byte[] truncate(byte[] original, int newLen) { + byte[] copy = new byte[newLen]; + System.arraycopy(original, 0, copy, 0, newLen); + return copy; + } + + private static void initIntReader(byte[] page, int valuesCount) throws Exception { + PforValuesReaderForInt reader = new PforValuesReaderForInt(); + reader.initFromPage(valuesCount, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); + reader.readInteger(); + } + + private static void initLongReader(byte[] page, int valuesCount) throws Exception { + PforValuesReaderForLong reader = new PforValuesReaderForLong(); + reader.initFromPage(valuesCount, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); + reader.readLong(); + } + + // --------------------------------------------------------------------------- + // Sanity: valid pages decode cleanly + // --------------------------------------------------------------------------- + + @Test + public void sanityBaselineDecodesClean() throws Exception { + byte[] page = validIntPage(2048, VECTOR_SIZE); + PforValuesReaderForInt reader = new PforValuesReaderForInt(); + reader.initFromPage(2048, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); + for (int i = 0; i < 2048; i++) { + reader.readInteger(); + } + } + + @Test + public void sanityBaselineLongDecodesClean() throws Exception { + byte[] page = validLongPage(2048, VECTOR_SIZE); + PforValuesReaderForLong reader = new PforValuesReaderForLong(); + reader.initFromPage(2048, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); + for (int i = 0; i < 2048; i++) { + reader.readLong(); + } + } + + // --------------------------------------------------------------------------- + // Header validation + // --------------------------------------------------------------------------- + + @Test + public void rejectsBadPackingMode() throws Exception { + byte[] page = validIntPage(1024, VECTOR_SIZE); + byte[] bad = mutate(page, 0, (byte) 99); + assertThrows(ParquetDecodingException.class, () -> initIntReader(bad, 1024)); + } + + @Test + public void rejectsLogVectorSizeTooLarge() throws Exception { + byte[] page = validIntPage(1024, VECTOR_SIZE); + byte[] bad = mutate(page, 1, (byte) 16); // MAX_LOG_VECTOR_SIZE is 15 + assertThrows(ParquetDecodingException.class, () -> initIntReader(bad, 1024)); + } + + @Test + public void rejectsLogVectorSizeTooSmall() throws Exception { + byte[] page = validIntPage(1024, VECTOR_SIZE); + byte[] bad = mutate(page, 1, (byte) 2); // MIN_LOG_VECTOR_SIZE is 3 + assertThrows(ParquetDecodingException.class, () -> initIntReader(bad, 1024)); + } + + @Test + public void rejectsBadValueByteWidth() throws Exception { + byte[] page = validIntPage(1024, VECTOR_SIZE); + byte[] bad = mutate(page, 2, (byte) 3); // must be 4 or 8 + assertThrows(ParquetDecodingException.class, () -> initIntReader(bad, 1024)); + } + + @Test + public void rejectsNegativeNumElements() throws Exception { + byte[] page = validIntPage(1024, VECTOR_SIZE); + // Overwrite num_elements (bytes 3-6) with -1 (0xFFFFFFFF) + byte[] bad = page.clone(); + bad[3] = (byte) 0xFF; + bad[4] = (byte) 0xFF; + bad[5] = (byte) 0xFF; + bad[6] = (byte) 0xFF; + assertThrows(ParquetDecodingException.class, () -> initIntReader(bad, 1024)); + } + + @Test + public void rejectsNumElementsGreaterThanValuesCount() throws Exception { + byte[] page = validIntPage(1024, VECTOR_SIZE); + // page header says 1024 elements but we pass valuesCount=500 + assertThrows(ParquetDecodingException.class, () -> initIntReader(page, 500)); + } + + // --------------------------------------------------------------------------- + // Truncation / corruption + // --------------------------------------------------------------------------- + + @Test + public void rejectsHeaderOnlyPage() { + byte[] page = new byte[PforConstants.PFOR_HEADER_SIZE]; + page[0] = (byte) PforConstants.PFOR_PACKING_MODE_FOR; + page[1] = (byte) PforConstants.DEFAULT_VECTOR_SIZE_LOG; + page[2] = (byte) PforConstants.INT32_VALUE_BYTE_WIDTH; + // num_elements = 100 in LE + page[3] = 100; + page[4] = 0; + page[5] = 0; + page[6] = 0; + + try { + initIntReader(page, 100); + fail("Expected exception for header-only page"); + } catch (Throwable t) { + assertNotNull(t); + } + } + + @Test + public void rejectsPageTruncatedMidOffsetArray() throws Exception { + byte[] page = validIntPage(2048, VECTOR_SIZE); + // Truncate inside the offset array (header=7 + partial offsets) + byte[] bad = truncate(page, PforConstants.PFOR_HEADER_SIZE + 2); + try { + initIntReader(bad, 2048); + fail("Expected exception for page truncated mid offset array"); + } catch (Throwable t) { + assertNotNull(t); + } + } + + @Test + public void rejectsPageTruncatedMidVectorData() throws Exception { + byte[] page = validIntPage(2048, VECTOR_SIZE); + // Keep header + offset array but truncate vector data + int offsetArrayEnd = PforConstants.PFOR_HEADER_SIZE + 2 * Integer.BYTES; + byte[] bad = truncate(page, offsetArrayEnd + 3); + try { + initIntReader(bad, 2048); + fail("Expected exception for page truncated mid vector data"); + } catch (Throwable t) { + assertNotNull(t); + } + } + + @Test + public void rejectsCorruptedOffsetPointingPastEnd() throws Exception { + byte[] page = validIntPage(1024, VECTOR_SIZE); + // The offset array starts at byte 7. Overwrite first offset to point past buffer end. + byte[] bad = page.clone(); + int hugeOffset = page.length * 2; + bad[7] = (byte) (hugeOffset & 0xFF); + bad[8] = (byte) ((hugeOffset >>> 8) & 0xFF); + bad[9] = (byte) ((hugeOffset >>> 16) & 0xFF); + bad[10] = (byte) ((hugeOffset >>> 24) & 0xFF); + try { + initIntReader(bad, 1024); + fail("Expected exception for corrupted offset"); + } catch (Throwable t) { + assertNotNull(t); + } + } + + // --------------------------------------------------------------------------- + // Skip/read bounds + // --------------------------------------------------------------------------- + + @Test + public void rejectsSkipPastEnd() throws Exception { + byte[] page = validIntPage(100, 8); + PforValuesReaderForInt reader = new PforValuesReaderForInt(); + reader.initFromPage(100, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); + assertThrows(ParquetDecodingException.class, () -> reader.skip(101)); + } + + @Test + public void rejectsNegativeSkip() throws Exception { + byte[] page = validIntPage(100, 8); + PforValuesReaderForInt reader = new PforValuesReaderForInt(); + reader.initFromPage(100, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); + assertThrows(ParquetDecodingException.class, () -> reader.skip(-1)); + } + + @Test + public void rejectsReadPastEnd() throws Exception { + byte[] page = validIntPage(10, 8); + PforValuesReaderForInt reader = new PforValuesReaderForInt(); + reader.initFromPage(10, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); + for (int i = 0; i < 10; i++) { + reader.readInteger(); + } + assertThrows(ParquetDecodingException.class, reader::readInteger); + } + + @Test + public void rejectsLongReadPastEnd() throws Exception { + byte[] page = validLongPage(10, 8); + PforValuesReaderForLong reader = new PforValuesReaderForLong(); + reader.initFromPage(10, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); + for (int i = 0; i < 10; i++) { + reader.readLong(); + } + assertThrows(ParquetDecodingException.class, reader::readLong); + } + + // --------------------------------------------------------------------------- + // Skip across vector boundaries works correctly + // --------------------------------------------------------------------------- + + @Test + public void skipAcrossVectorBoundary() throws Exception { + int vectorSize = 8; + int count = 30; + byte[] page = validIntPage(count, vectorSize); + PforValuesReaderForInt reader = new PforValuesReaderForInt(); + reader.initFromPage(count, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); + + // Skip past first two vectors (16 values), read from third + reader.skip(16); + int val = reader.readInteger(); + // Expected: 16 * 7 + 3 = 115 + assertTrue("Value after skip should be 115, got: " + val, val == 115); + } + + @Test + public void skipAcrossVectorBoundaryLong() throws Exception { + int vectorSize = 8; + int count = 30; + byte[] page = validLongPage(count, vectorSize); + PforValuesReaderForLong reader = new PforValuesReaderForLong(); + reader.initFromPage(count, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); + + reader.skip(16); + long val = reader.readLong(); + // Expected: 16 * 13 + 5 = 213 + assertTrue("Value after skip should be 213, got: " + val, val == 213L); + } +} From c14371d7adceba8c2067151c5dc0384eebb8a6fe Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 19:04:06 +0000 Subject: [PATCH 7/8] Eliminate per-vector allocations in PFOR readers Pre-allocate reusable decode buffers (deltasBuffer, excPositionsBuffer, unpackPadBuf, unpackTempBuf) in allocateDecodedBuffer instead of allocating new arrays on every decodeVector call. Mirrors the writer-side improvement from the previous commit. --- .../values/pfor/PforValuesReaderForInt.java | 38 +++++++++++------- .../values/pfor/PforValuesReaderForLong.java | 39 ++++++++++++------- 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java index b5d9fec57e..d91d421a1d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForInt.java @@ -43,6 +43,12 @@ public class PforValuesReaderForInt extends PforValuesReader { private int[] decodedValues; + // Reusable per-vector decode buffers + private int[] deltasBuffer; + private int[] excPositionsBuffer; + private byte[] unpackPadBuf; + private int[] unpackTempBuf; + public PforValuesReaderForInt() { super(); } @@ -50,6 +56,10 @@ public PforValuesReaderForInt() { @Override protected void allocateDecodedBuffer(int capacity) { this.decodedValues = new int[capacity]; + this.deltasBuffer = new int[capacity]; + this.excPositionsBuffer = new int[capacity]; + this.unpackPadBuf = new byte[Integer.SIZE]; // max bit width = 32 bytes + this.unpackTempBuf = new int[8]; } @Override @@ -74,32 +84,33 @@ protected void decodeVector(int vectorIdx) { int numExceptions = getShortLE(vectorsData, pos + 5) & 0xFFFF; pos += INT32_VECTOR_INFO_SIZE; - // Unpack bit-packed deltas - int[] deltas = new int[vectorLen]; + // Unpack bit-packed deltas into reusable buffer if (bitWidth > 0) { - pos = unpackIntsWithBytePacker(vectorsData, pos, deltas, vectorLen, bitWidth); + pos = unpackIntsWithBytePacker(vectorsData, pos, deltasBuffer, vectorLen, bitWidth); + } else { + for (int i = 0; i < vectorLen; i++) { + deltasBuffer[i] = 0; + } } // Add frame of reference to reconstruct values for (int i = 0; i < vectorLen; i++) { - decodedValues[i] = deltas[i] + frameOfReference; + decodedValues[i] = deltasBuffer[i] + frameOfReference; } // Overwrite exception slots with their original values if (numExceptions > 0) { - int[] excPositions = new int[numExceptions]; for (int e = 0; e < numExceptions; e++) { - excPositions[e] = getShortLE(vectorsData, pos) & 0xFFFF; + excPositionsBuffer[e] = getShortLE(vectorsData, pos) & 0xFFFF; pos += Short.BYTES; } for (int e = 0; e < numExceptions; e++) { - decodedValues[excPositions[e]] = getIntLE(vectorsData, pos); + decodedValues[excPositionsBuffer[e]] = getIntLE(vectorsData, pos); pos += Integer.BYTES; } } } - /** Unpack bit-packed ints in groups of 8, returns position after packed data. */ private int unpackIntsWithBytePacker(ByteBuffer buf, int pos, int[] output, int count, int bitWidth) { BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); int numFullGroups = count / 8; @@ -115,14 +126,15 @@ private int unpackIntsWithBytePacker(ByteBuffer buf, int pos, int[] output, int int alreadyRead = numFullGroups * bitWidth; int partialBytes = totalPackedBytes - alreadyRead; - byte[] padded = new byte[bitWidth]; for (int i = 0; i < partialBytes; i++) { - padded[i] = buf.get(pos + i); + unpackPadBuf[i] = buf.get(pos + i); + } + for (int i = partialBytes; i < bitWidth; i++) { + unpackPadBuf[i] = 0; } - int[] temp = new int[8]; - packer.unpack8Values(padded, 0, temp, 0); - System.arraycopy(temp, 0, output, numFullGroups * 8, remaining); + packer.unpack8Values(unpackPadBuf, 0, unpackTempBuf, 0); + System.arraycopy(unpackTempBuf, 0, output, numFullGroups * 8, remaining); pos += partialBytes; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java index 3afbf0943b..e8c8fe4f1e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/pfor/PforValuesReaderForLong.java @@ -43,6 +43,12 @@ public class PforValuesReaderForLong extends PforValuesReader { private long[] decodedValues; + // Reusable per-vector decode buffers + private long[] deltasBuffer; + private int[] excPositionsBuffer; + private byte[] unpackPadBuf; + private long[] unpackTempBuf; + public PforValuesReaderForLong() { super(); } @@ -50,6 +56,10 @@ public PforValuesReaderForLong() { @Override protected void allocateDecodedBuffer(int capacity) { this.decodedValues = new long[capacity]; + this.deltasBuffer = new long[capacity]; + this.excPositionsBuffer = new int[capacity]; + this.unpackPadBuf = new byte[Long.SIZE]; // max bit width = 64 bytes + this.unpackTempBuf = new long[8]; } @Override @@ -74,26 +84,28 @@ protected void decodeVector(int vectorIdx) { int numExceptions = getShortLE(vectorsData, pos + 9) & 0xFFFF; pos += INT64_VECTOR_INFO_SIZE; - // Unpack bit-packed deltas - long[] deltas = new long[vectorLen]; + // Unpack bit-packed deltas into reusable buffer if (bitWidth > 0) { - pos = unpackLongsWithBytePacker(vectorsData, pos, deltas, vectorLen, bitWidth); + pos = unpackLongsWithBytePacker(vectorsData, pos, deltasBuffer, vectorLen, bitWidth); + } else { + for (int i = 0; i < vectorLen; i++) { + deltasBuffer[i] = 0; + } } // Add frame of reference to reconstruct values for (int i = 0; i < vectorLen; i++) { - decodedValues[i] = deltas[i] + frameOfReference; + decodedValues[i] = deltasBuffer[i] + frameOfReference; } // Overwrite exception slots with their original values if (numExceptions > 0) { - int[] excPositions = new int[numExceptions]; for (int e = 0; e < numExceptions; e++) { - excPositions[e] = getShortLE(vectorsData, pos) & 0xFFFF; + excPositionsBuffer[e] = getShortLE(vectorsData, pos) & 0xFFFF; pos += Short.BYTES; } for (int e = 0; e < numExceptions; e++) { - decodedValues[excPositions[e]] = getLongLE(vectorsData, pos); + decodedValues[excPositionsBuffer[e]] = getLongLE(vectorsData, pos); pos += Long.BYTES; } } @@ -109,21 +121,20 @@ private int unpackLongsWithBytePacker(ByteBuffer buf, int pos, long[] output, in pos += bitWidth; } - // Last group might have fewer than 8 values; zero-pad and unpack, - // but only advance pos by the actual bytes in the page. if (remaining > 0) { int totalPackedBytes = (count * bitWidth + 7) / 8; int alreadyRead = numFullGroups * bitWidth; int partialBytes = totalPackedBytes - alreadyRead; - byte[] padded = new byte[bitWidth]; for (int i = 0; i < partialBytes; i++) { - padded[i] = buf.get(pos + i); + unpackPadBuf[i] = buf.get(pos + i); + } + for (int i = partialBytes; i < bitWidth; i++) { + unpackPadBuf[i] = 0; } - long[] temp = new long[8]; - packer.unpack8Values(padded, 0, temp, 0); - System.arraycopy(temp, 0, output, numFullGroups * 8, remaining); + packer.unpack8Values(unpackPadBuf, 0, unpackTempBuf, 0); + System.arraycopy(unpackTempBuf, 0, output, numFullGroups * 8, remaining); pos += partialBytes; } From 680b9a9f26c0f4de2dcbcc48aae2a5cf3aa4c471 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 19:36:51 +0000 Subject: [PATCH 8/8] Update empty-input tests for header-always-emitted behavior getBytes() now emits a valid 7-byte header even when totalCount==0, so the reader can distinguish an empty PFOR page from a missing encoding. Update assertions from size==0 to size==PFOR_HEADER_SIZE. --- .../parquet/column/values/pfor/PforValuesEndToEndTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforValuesEndToEndTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforValuesEndToEndTest.java index d1c9aa80fd..d1fa34a90c 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforValuesEndToEndTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/pfor/PforValuesEndToEndTest.java @@ -431,7 +431,8 @@ public void testIntEmptyInput() throws Exception { PforValuesWriter.IntPforValuesWriter writer = new PforValuesWriter.IntPforValuesWriter( 256, 256, new DirectByteBufferAllocator()); BytesInput bytes = writer.getBytes(); - assertEquals(0, bytes.size()); + // Empty page still emits a valid 7-byte header (numElements=0) + assertEquals(PforConstants.PFOR_HEADER_SIZE, bytes.size()); writer.close(); } @@ -440,7 +441,7 @@ public void testLongEmptyInput() throws Exception { PforValuesWriter.LongPforValuesWriter writer = new PforValuesWriter.LongPforValuesWriter( 256, 256, new DirectByteBufferAllocator()); BytesInput bytes = writer.getBytes(); - assertEquals(0, bytes.size()); + assertEquals(PforConstants.PFOR_HEADER_SIZE, bytes.size()); writer.close(); } }