diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index e9ebed2826f4..1727a2b7dd3c 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -252,7 +252,14 @@ private static Types.NestedField getPhysicalType( // Use FixedSizeBinaryVector for binary backed decimal type = Types.FixedType.ofLength(primitive.getTypeLength()); } - physicalType = Types.NestedField.from(logicalType).ofType(type).build(); + // drop initialDefault/writeDefault: they are typed for the logical (decimal) type and + // cannot be cast to the underlying physical type + physicalType = + Types.NestedField.from(logicalType) + .ofType(type) + .withInitialDefault(null) + .withWriteDefault(null) + .build(); } return physicalType; diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestVectorizedDefaultValues.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestVectorizedDefaultValues.java new file mode 100644 index 000000000000..5b50168d6167 --- /dev/null +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestVectorizedDefaultValues.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.math.BigDecimal; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Vectorized-read tests focused on Iceberg field defaults. The reader has two paths that interact + * with defaults: + * + * + * + *

These tests exercise the second path. The bug only surfaces when the column is not + * dictionary-encoded — with dictionary encoding {@code allocateDictEncodedVector} is used and + * {@code getPhysicalType} is bypassed. So the parquet file is written with dictionary encoding + * disabled. + */ +public class TestVectorizedDefaultValues { + + @TempDir private File tempDir; + + @Test + public void testDecimalWithDefaultValueNotDictionaryEncoded() throws Exception { + Schema schema = + new Schema( + Types.NestedField.required("id").withId(1).ofType(Types.LongType.get()).build(), + Types.NestedField.optional("int_backed") + .withId(2) + .ofType(Types.DecimalType.of(5, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build(), + Types.NestedField.optional("long_backed") + .withId(3) + .ofType(Types.DecimalType.of(15, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build(), + Types.NestedField.optional("fixed_backed") + .withId(4) + .ofType(Types.DecimalType.of(25, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build()); + + HadoopTables tables = new HadoopTables(); + Table table = + tables.create( + schema, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "3"), + tempDir.toURI().toString()); + + List records = Lists.newArrayList(); + GenericRecord template = GenericRecord.create(schema); + for (long i = 0; i < 5; i++) { + GenericRecord rec = template.copy(); + rec.setField("id", i); + rec.setField("int_backed", new BigDecimal("12.34")); + rec.setField("long_backed", new BigDecimal("1234567890.12")); + rec.setField("fixed_backed", new BigDecimal("1234567890123456789.12")); + records.add(rec); + } + + File dataFile = new File(tempDir, "decimal-no-dict.parquet"); + try (FileAppender writer = + Parquet.write(Files.localOutput(dataFile)) + .schema(schema) + .createWriterFunc(GenericParquetWriter::create) + .set(ParquetOutputFormat.ENABLE_DICTIONARY, "false") + .build()) { + writer.addAll(records); + } + + DataFile parquetFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(dataFile.getAbsolutePath()) + .withFileSizeInBytes(dataFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(records.size()) + .build(); + table.newAppend().appendFile(parquetFile).commit(); + + int rowsRead = 0; + try (VectorizedTableScanIterable reader = + new VectorizedTableScanIterable(table.newScan(), 1024, false)) { + for (ColumnarBatch batch : reader) { + rowsRead += batch.numRows(); + } + } + + assertThat(rowsRead).isEqualTo(records.size()); + } +} diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 6011c6dad7d2..4a24a725e0f5 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.math.BigDecimal; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Path; @@ -45,6 +46,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; @@ -65,6 +67,7 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -254,6 +257,16 @@ FileAppender getParquetV2Writer(Schema schema, File testFile) throws IOE .build(); } + FileAppender parquetWriterWithoutDictionary(Schema schema, File testFile) + throws IOException { + return Parquet.write(Files.localOutput(testFile)) + .schema(schema) + .createWriterFunc(GenericParquetWriter::create) + .named("test") + .set(ParquetOutputFormat.ENABLE_DICTIONARY, "false") + .build(); + } + void assertRecordsMatch( Schema schema, int expectedSize, @@ -460,6 +473,39 @@ public void testUuidReads() throws Exception { assertRecordsMatch(schema, numRows, data, dataFile, false, BATCH_SIZE); } + @Test + public void testDecimalWithDefaultValueNotDictionaryEncoded() throws Exception { + Schema schema = + new Schema( + required(100, "id", Types.LongType.get()), + Types.NestedField.optional("int_backed") + .withId(101) + .ofType(Types.DecimalType.of(5, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build(), + Types.NestedField.optional("long_backed") + .withId(102) + .ofType(Types.DecimalType.of(15, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build(), + Types.NestedField.optional("fixed_backed") + .withId(103) + .ofType(Types.DecimalType.of(25, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build()); + + File dataFile = temp.resolve("decimal-no-dict.parquet").toFile(); + Iterable data = generateData(schema, 1000, 0L, 0.0f, IDENTITY); + try (FileAppender writer = parquetWriterWithoutDictionary(schema, dataFile)) { + writer.addAll(data); + } + + assertRecordsMatch(schema, 1000, data, dataFile, false, BATCH_SIZE); + } + private void assertIdenticalFileContents( File actual, File expected, Schema schema, boolean vectorized) throws IOException { try (CloseableIterable expectedIterator =