Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForInteger;
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForLong;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
import org.apache.parquet.column.values.pfor.PforValuesReaderForInt;
import org.apache.parquet.column.values.pfor.PforValuesReaderForLong;
import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
Expand Down Expand Up @@ -147,6 +149,27 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu
}
},

/**
* PFOR (Patched Frame of Reference) encoding for INT32 and INT64 types.
* Compresses integer columns by subtracting the minimum value, selecting an
* optimal bit width via a cost model, bit-packing the deltas, and storing
* outlier values as exceptions.
*/
PFOR {
@Override
public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
switch (descriptor.getType()) {
case INT32:
return new PforValuesReaderForInt();
case INT64:
return new PforValuesReaderForLong();
default:
throw new ParquetDecodingException(
"PFOR encoding is only supported for INT32 and INT64, not " + descriptor.getType());
}
}
},

/**
* @deprecated This is no longer used, and has been replaced by {@link #RLE}
* which is combination of bit packing and rle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ParquetProperties {
public static final int DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
public static final boolean DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED = false;
public static final boolean DEFAULT_IS_PFOR_ENABLED = false;
public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0;
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
Expand Down Expand Up @@ -132,6 +133,7 @@ public static WriterVersion fromString(String name) {
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
private final ColumnProperty<Boolean> pforEnabled;
private final Map<String, String> extraMetaData;
private final ColumnProperty<Boolean> statistics;
private final ColumnProperty<Boolean> sizeStatistics;
Expand Down Expand Up @@ -164,6 +166,7 @@ private ParquetProperties(Builder builder) {
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
this.pforEnabled = builder.pforEnabled.build();
this.extraMetaData = builder.extraMetaData;
this.statistics = builder.statistics.build();
this.sizeStatistics = builder.sizeStatistics.build();
Expand Down Expand Up @@ -259,6 +262,23 @@ public boolean isByteStreamSplitEnabled(ColumnDescriptor column) {
}
}

/**
* Check if PFOR encoding is enabled for the given column.
* PFOR encoding is only supported for INT32 and INT64 types.
*
* @param column the column descriptor
* @return true if PFOR encoding is enabled for this column
*/
public boolean isPforEnabled(ColumnDescriptor column) {
switch (column.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
case INT64:
return pforEnabled.getValue(column);
default:
return false;
}
}

public ByteBufferAllocator getAllocator() {
return allocator;
}
Expand Down Expand Up @@ -416,6 +436,7 @@ public static class Builder {
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
private final ColumnProperty.Builder<Boolean> pforEnabled;
private Map<String, String> extraMetaData = new HashMap<>();
private final ColumnProperty.Builder<Boolean> statistics;
private final ColumnProperty.Builder<Boolean> sizeStatistics;
Expand All @@ -427,6 +448,7 @@ private Builder() {
DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED
? ByteStreamSplitMode.FLOATING_POINT
: ByteStreamSplitMode.NONE);
pforEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_PFOR_ENABLED);
bloomFilterEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED);
bloomFilterNDVs = ColumnProperty.<Long>builder().withDefaultValue(null);
bloomFilterFPPs = ColumnProperty.<Double>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_FPP);
Expand Down Expand Up @@ -457,6 +479,7 @@ private Builder(ParquetProperties toCopy) {
this.numBloomFilterCandidates = ColumnProperty.builder(toCopy.numBloomFilterCandidates);
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
this.byteStreamSplitEnabled = ColumnProperty.builder(toCopy.byteStreamSplitEnabled);
this.pforEnabled = ColumnProperty.builder(toCopy.pforEnabled);
this.extraMetaData = toCopy.extraMetaData;
this.statistics = ColumnProperty.builder(toCopy.statistics);
this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics);
Expand Down Expand Up @@ -534,6 +557,29 @@ public Builder withExtendedByteStreamSplitEncoding(boolean enable) {
return this;
}

/**
* Enable or disable PFOR encoding for INT32 and INT64 columns.
*
* @param enable whether PFOR encoding should be enabled
* @return this builder for method chaining.
*/
public Builder withPforEncoding(boolean enable) {
this.pforEnabled.withDefaultValue(enable);
return this;
}

/**
* Enable or disable PFOR encoding for the specified column.
*
* @param columnPath the path of the column (dot-string)
* @param enable whether PFOR encoding should be enabled
* @return this builder for method chaining.
*/
public Builder withPforEncoding(String columnPath, boolean enable) {
this.pforEnabled.withValue(columnPath, enable);
return this;
}

/**
* Set the Parquet format dictionary page size.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
import org.apache.parquet.column.values.pfor.PforValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
Expand Down Expand Up @@ -115,7 +116,12 @@ private ValuesWriter getBinaryValuesWriter(ColumnDescriptor path) {

private ValuesWriter getInt32ValuesWriter(ColumnDescriptor path) {
final ValuesWriter fallbackWriter;
if (parquetProperties.isByteStreamSplitEnabled(path)) {
if (this.parquetProperties.isPforEnabled(path)) {
fallbackWriter = new PforValuesWriter.IntPforValuesWriter(
parquetProperties.getInitialSlabSize(),
parquetProperties.getPageSizeThreshold(),
parquetProperties.getAllocator());
} else if (parquetProperties.isByteStreamSplitEnabled(path)) {
fallbackWriter = new ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter(
parquetProperties.getInitialSlabSize(),
parquetProperties.getPageSizeThreshold(),
Expand All @@ -132,7 +138,12 @@ private ValuesWriter getInt32ValuesWriter(ColumnDescriptor path) {

private ValuesWriter getInt64ValuesWriter(ColumnDescriptor path) {
final ValuesWriter fallbackWriter;
if (parquetProperties.isByteStreamSplitEnabled(path)) {
if (this.parquetProperties.isPforEnabled(path)) {
fallbackWriter = new PforValuesWriter.LongPforValuesWriter(
parquetProperties.getInitialSlabSize(),
parquetProperties.getPageSizeThreshold(),
parquetProperties.getAllocator());
} else if (parquetProperties.isByteStreamSplitEnabled(path)) {
fallbackWriter = new ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter(
parquetProperties.getInitialSlabSize(),
parquetProperties.getPageSizeThreshold(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.values.pfor;

import org.apache.parquet.Preconditions;

/**
* Constants for the PFOR (Patched Frame of Reference) encoding.
*
* <p>PFOR encoding compresses integer columns (INT32/INT64) by:
* <ol>
* <li>Subtracting the minimum value (Frame of Reference)</li>
* <li>Choosing an optimal bit width via a cost model</li>
* <li>Bit-packing the deltas at the chosen width</li>
* <li>Storing outlier values (exceptions) separately with their positions</li>
* </ol>
*/
public final class PforConstants {

private PforConstants() {
// Utility class
}

// Page header fields (7 bytes total)
public static final int PFOR_PACKING_MODE_FOR = 0;
public static final int PFOR_HEADER_SIZE = 7;

public static final int DEFAULT_VECTOR_SIZE = 1024;
public static final int DEFAULT_VECTOR_SIZE_LOG = 10;

// Capped at 15 (vectorSize=32768) because num_exceptions is uint16,
// so vectorSize must not exceed 65535 to avoid overflow when all values are exceptions.
static final int MAX_LOG_VECTOR_SIZE = 15;
static final int MIN_LOG_VECTOR_SIZE = 3;

// Maximum exceptions per vector (uint16)
public static final int MAX_EXCEPTIONS = 65535;

// Per-vector metadata sizes in bytes
// INT32: frame_of_reference(4) + bit_width(1) + num_exceptions(2) = 7
public static final int INT32_VECTOR_INFO_SIZE = 7;
// INT64: frame_of_reference(8) + bit_width(1) + num_exceptions(2) = 11
public static final int INT64_VECTOR_INFO_SIZE = 11;

// Value byte widths
public static final int INT32_VALUE_BYTE_WIDTH = 4;
public static final int INT64_VALUE_BYTE_WIDTH = 8;

/** Validates vector size: must be a power of 2 in [2^MIN_LOG .. 2^MAX_LOG]. */
static int validateVectorSize(int vectorSize) {
Preconditions.checkArgument(
vectorSize > 0 && (vectorSize & (vectorSize - 1)) == 0,
"Vector size must be a power of 2, got: %s",
vectorSize);
int logSize = Integer.numberOfTrailingZeros(vectorSize);
Preconditions.checkArgument(
logSize >= MIN_LOG_VECTOR_SIZE && logSize <= MAX_LOG_VECTOR_SIZE,
"Vector size log2 must be between %s and %s, got: %s (vectorSize=%s)",
MIN_LOG_VECTOR_SIZE,
MAX_LOG_VECTOR_SIZE,
logSize,
vectorSize);
return vectorSize;
}
}
Loading