From 3fcc6c8a5d5e8ebe2c56edd66cc4530a13952b7a Mon Sep 17 00:00:00 2001 From: Anupam Yadav Date: Mon, 11 May 2026 20:57:18 +0000 Subject: [PATCH] PARQUET-3479: Add configuration to disable early dictionary compression check --- .../parquet/column/ParquetProperties.java | 29 ++++ .../factory/DefaultValuesWriterFactory.java | 4 +- .../values/fallback/FallbackValuesWriter.java | 39 +++-- .../fallback/TestFallbackValuesWriter.java | 100 +++++++++++++ .../parquet/hadoop/ParquetOutputFormat.java | 10 ++ .../apache/parquet/hadoop/ParquetWriter.java | 11 ++ .../hadoop/TestDictionaryEarlyCheck.java | 136 ++++++++++++++++++ 7 files changed, 318 insertions(+), 11 deletions(-) create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/fallback/TestFallbackValuesWriter.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDictionaryEarlyCheck.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..7b55b740c3 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -67,6 +67,7 @@ public class ParquetProperties { public static final boolean DEFAULT_STATISTICS_ENABLED = true; public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true; + public static final long DEFAULT_DICTIONARY_CHECK_AFTER_BYTES = 0; public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; /** @@ -131,6 +132,7 @@ public static WriterVersion fromString(String name) { private final int rowGroupRowCountLimit; private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; + private final long dictionaryCheckAfterBytes; private final ColumnProperty byteStreamSplitEnabled; private final Map extraMetaData; private final ColumnProperty statistics; @@ -163,6 +165,7 @@ private ParquetProperties(Builder builder) { this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit; this.pageRowCountLimit = builder.pageRowCountLimit; this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; + this.dictionaryCheckAfterBytes = builder.dictionaryCheckAfterBytes; this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build(); this.extraMetaData = builder.extraMetaData; this.statistics = builder.statistics.build(); @@ -322,6 +325,17 @@ public boolean getPageWriteChecksumEnabled() { return pageWriteChecksumEnabled; } + /** + * Returns the byte threshold after which the dictionary compression check is performed. + * A value of 0 means check on the first page (backward compatible default). Higher values + * delay the check until that many raw bytes have been accumulated across pages. + * + * @return the byte threshold for the dictionary compression check + */ + public long getDictionaryCheckAfterBytes() { + return dictionaryCheckAfterBytes; + } + public OptionalLong getBloomFilterNDV(ColumnDescriptor column) { Long ndv = bloomFilterNDVs.getValue(column); return ndv == null ? OptionalLong.empty() : OptionalLong.of(ndv); @@ -415,6 +429,7 @@ public static class Builder { private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT; private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; + private long dictionaryCheckAfterBytes = DEFAULT_DICTIONARY_CHECK_AFTER_BYTES; private final ColumnProperty.Builder byteStreamSplitEnabled; private Map extraMetaData = new HashMap<>(); private final ColumnProperty.Builder statistics; @@ -450,6 +465,7 @@ private Builder(ParquetProperties toCopy) { this.allocator = toCopy.allocator; this.pageRowCountLimit = toCopy.pageRowCountLimit; this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled; + this.dictionaryCheckAfterBytes = toCopy.dictionaryCheckAfterBytes; this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs); this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs); this.bloomFilterEnabled = ColumnProperty.builder(toCopy.bloomFilterEnabled); @@ -709,6 +725,19 @@ public Builder withPageWriteChecksumEnabled(boolean val) { return this; } + /** + * Set the raw data byte threshold after which the dictionary compression check is performed. + * A value of 0 means check on the first page (backward compatible default). Higher values + * delay the check until that many raw bytes have been accumulated across pages. + * + * @param val byte threshold (default: 0) + * @return this builder for method chaining + */ + public Builder withDictionaryCheckAfterBytes(long val) { + this.dictionaryCheckAfterBytes = val; + return this; + } + public Builder withExtraMetaData(Map extraMetaData) { this.extraMetaData = extraMetaData; return this; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java index 4c03e6b65e..55f02c8aff 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java @@ -111,7 +111,9 @@ static ValuesWriter dictWriterWithFallBack( ValuesWriter writerToFallBackTo) { if (parquetProperties.isDictionaryEnabled(path)) { return FallbackValuesWriter.of( - dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding), writerToFallBackTo); + dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding), + writerToFallBackTo, + parquetProperties.getDictionaryCheckAfterBytes()); } else { return writerToFallBackTo; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java index 7f56ef2192..02de54de61 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java @@ -30,7 +30,12 @@ public class FallbackValuesWriter FallbackValuesWriter of( I initialWriter, F fallBackWriter) { - return new FallbackValuesWriter<>(initialWriter, fallBackWriter); + return new FallbackValuesWriter<>(initialWriter, fallBackWriter, 0); + } + + public static FallbackValuesWriter of( + I initialWriter, F fallBackWriter, long checkAfterBytes) { + return new FallbackValuesWriter<>(initialWriter, fallBackWriter, checkAfterBytes); } /** @@ -44,6 +49,14 @@ public static = checkAfterBytes) { + compressionChecked = true; BytesInput bytes = initialWriter.getBytes(); if (!initialWriter.isCompressionSatisfying(rawDataByteSize, bytes.size())) { fallBack(); @@ -103,7 +116,6 @@ public Encoding getEncoding() { @Override public void reset() { rawDataByteSize = 0; - firstPage = false; currentWriter.reset(); } @@ -131,8 +143,9 @@ public void resetDictionary() { } currentWriter = initialWriter; fellBackAlready = false; + compressionChecked = false; + cumulativeRawBytes = 0; initialUsedAndHadDictionary = false; - firstPage = true; } @Override @@ -167,6 +180,7 @@ private void fallBack() { @Override public void writeByte(int value) { rawDataByteSize += 1; + cumulativeRawBytes += 1; currentWriter.writeByte(value); checkFallback(); } @@ -175,6 +189,7 @@ public void writeByte(int value) { public void writeBytes(Binary v) { // for rawdata, length(4 bytes int) is stored, followed by the binary content itself rawDataByteSize += v.length() + 4; + cumulativeRawBytes += v.length() + 4; currentWriter.writeBytes(v); checkFallback(); } @@ -182,6 +197,7 @@ public void writeBytes(Binary v) { @Override public void writeInteger(int v) { rawDataByteSize += 4; + cumulativeRawBytes += 4; currentWriter.writeInteger(v); checkFallback(); } @@ -189,6 +205,7 @@ public void writeInteger(int v) { @Override public void writeLong(long v) { rawDataByteSize += 8; + cumulativeRawBytes += 8; currentWriter.writeLong(v); checkFallback(); } @@ -196,6 +213,7 @@ public void writeLong(long v) { @Override public void writeFloat(float v) { rawDataByteSize += 4; + cumulativeRawBytes += 4; currentWriter.writeFloat(v); checkFallback(); } @@ -203,6 +221,7 @@ public void writeFloat(float v) { @Override public void writeDouble(double v) { rawDataByteSize += 8; + cumulativeRawBytes += 8; currentWriter.writeDouble(v); checkFallback(); } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/fallback/TestFallbackValuesWriter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/fallback/TestFallbackValuesWriter.java new file mode 100644 index 0000000000..898fdb7531 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/fallback/TestFallbackValuesWriter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.fallback; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter; +import org.apache.parquet.column.values.plain.PlainValuesWriter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestFallbackValuesWriter { + + private TrackingByteBufferAllocator allocator; + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator()); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + /** + * With threshold=0, the check fires on the first page and falls back for high-cardinality data. + */ + @Test + public void testThresholdZeroFallsBackImmediately() throws Exception { + int dictPageSize = 1024 * 1024; + + PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter( + dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator); + PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator); + FallbackValuesWriter writer = + FallbackValuesWriter.of(dictWriter, plainWriter, 0); + + try { + for (int i = 0; i < 1000; i++) { + writer.writeInteger(i); + } + writer.getBytes(); + + assertFalse( + "Should fall back to plain encoding with threshold=0 and high cardinality", + writer.getEncoding().usesDictionary()); + } finally { + writer.close(); + } + } + + /** + * With a large threshold, the check never fires and dictionary encoding is preserved. + */ + @Test + public void testLargeThresholdPreservesDictionary() throws Exception { + int dictPageSize = 1024 * 1024; + + PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter( + dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator); + PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator); + FallbackValuesWriter writer = + FallbackValuesWriter.of(dictWriter, plainWriter, Long.MAX_VALUE); + + try { + for (int i = 0; i < 1000; i++) { + writer.writeInteger(i); + } + writer.getBytes(); + + assertTrue( + "Dictionary encoding should be preserved with large threshold", + writer.getEncoding().usesDictionary()); + } finally { + writer.close(); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 868ae634c1..9f37aac622 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -161,6 +161,7 @@ public static enum JobSummaryLevel { public static final String BLOCK_ROW_COUNT_LIMIT = "parquet.block.row.count.limit"; public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; + public static final String DICTIONARY_CHECK_AFTER_BYTES = "parquet.dictionary.check.after.raw.bytes"; public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled"; public static final String SIZE_STATISTICS_ENABLED = "parquet.size.statistics.enabled"; @@ -412,6 +413,14 @@ public static boolean getPageWriteChecksumEnabled(Configuration conf) { return conf.getBoolean(PAGE_WRITE_CHECKSUM_ENABLED, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); } + public static void setDictionaryCheckAfterBytes(Configuration conf, long val) { + conf.setLong(DICTIONARY_CHECK_AFTER_BYTES, val); + } + + public static long getDictionaryCheckAfterBytes(Configuration conf) { + return conf.getLong(DICTIONARY_CHECK_AFTER_BYTES, ParquetProperties.DEFAULT_DICTIONARY_CHECK_AFTER_BYTES); + } + public static void setStatisticsEnabled(JobContext jobContext, boolean enabled) { getConfiguration(jobContext).setBoolean(STATISTICS_ENABLED, enabled); } @@ -526,6 +535,7 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withRowGroupRowCountLimit(getBlockRowCountLimit(conf)) .withPageRowCountLimit(getPageRowCountLimit(conf)) .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf)) + .withDictionaryCheckAfterBytes(getDictionaryCheckAfterBytes(conf)) .withStatisticsEnabled(getStatisticsEnabled(conf)); new ColumnConfigParser() .withColumnConfig( diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 8eb5f7f17b..d755597015 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -771,6 +771,17 @@ public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) { return self(); } + /** + * Set the raw data byte threshold after which the dictionary compression check is performed. + * + * @param val byte threshold (0 means check on every page) + * @return this builder for method chaining. + */ + public SELF withDictionaryCheckAfterBytes(long val) { + encodingPropsBuilder.withDictionaryCheckAfterBytes(val); + return self(); + } + /** * Set max Bloom filter bytes for related columns. * diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDictionaryEarlyCheck.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDictionaryEarlyCheck.java new file mode 100644 index 0000000000..74e03be24f --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDictionaryEarlyCheck.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Integration test verifying that the dictionary check threshold affects encoding in written Parquet files. + */ +public class TestDictionaryEarlyCheck { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private TrackingByteBufferAllocator allocator; + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + @Test + public void testLargeThresholdPreservesDictionaryInFile() throws IOException { + Configuration conf = new Configuration(); + MessageType schema = MessageTypeParser.parseMessageType("message test { required int32 val; }"); + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + + Path file = new Path(temp.getRoot().toString(), "large_threshold.parquet"); + + ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.toString())) + .withAllocator(allocator) + .withCompressionCodec(UNCOMPRESSED) + .withDictionaryPageSize(1024 * 1024) + .enableDictionaryEncoding() + .withDictionaryCheckAfterBytes(Long.MAX_VALUE) + .withWriterVersion(org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0) + .withConf(conf) + .build(); + + // Write 1000 distinct values — would normally trigger fallback with threshold=0 + for (int i = 0; i < 1000; i++) { + writer.write(factory.newGroup().append("val", i)); + } + writer.close(); + + ParquetMetadata footer = readFooter(conf, file, NO_FILTER); + for (BlockMetaData block : footer.getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + assertTrue( + "Dictionary encoding should be preserved with large threshold, got: " + column.getEncodings(), + column.getEncodings().contains(Encoding.PLAIN_DICTIONARY)); + } + } + } + + @Test + public void testZeroThresholdFallsBackInFile() throws IOException { + Configuration conf = new Configuration(); + MessageType schema = MessageTypeParser.parseMessageType("message test { required int32 val; }"); + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + + Path file = new Path(temp.getRoot().toString(), "zero_threshold.parquet"); + + ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.toString())) + .withAllocator(allocator) + .withCompressionCodec(UNCOMPRESSED) + .withDictionaryPageSize(1024 * 1024) + .enableDictionaryEncoding() + .withDictionaryCheckAfterBytes(0) + .withWriterVersion(org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0) + .withConf(conf) + .build(); + + for (int i = 0; i < 1000; i++) { + writer.write(factory.newGroup().append("val", i)); + } + writer.close(); + + ParquetMetadata footer = readFooter(conf, file, NO_FILTER); + for (BlockMetaData block : footer.getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + assertFalse( + "Should fall back from dictionary with threshold=0, got: " + column.getEncodings(), + column.getEncodings().contains(Encoding.PLAIN_DICTIONARY)); + } + } + } +}