From db234e4fe79ac00bb3799b90049e3c529ec6d1e7 Mon Sep 17 00:00:00 2001 From: Tushar Choudhary <151359025+tusharchou@users.noreply.github.com> Date: Sun, 15 Mar 2026 17:27:52 +0530 Subject: [PATCH 1/3] fix: dynamic_partition_overwrite now builds per-spec delete predicates after partition spec evolution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #3148 When a table has undergone partition spec evolution, its snapshot may contain manifests written under different partition_spec_ids. Previously, dynamic_partition_overwrite built the delete predicate using only the current spec, causing the manifest evaluator to incorrectly skip manifests from older specs — leaving stale data files silently behind. The fix builds the delete predicate per historical spec present in the snapshot, projecting the new data files' partition values into each spec's coordinate space before evaluating. Regression tests added covering: - Mixed-spec snapshot (manifests from both spec-0 and spec-1) - Overwrite of a partition that only exists in spec-0 manifests (silent data duplication case) --- pyiceberg/table/__init__.py | 68 ++++- .../test_manifest_pruning_spec_evolution.py | 267 ++++++++++++++++++ 2 files changed, 330 insertions(+), 5 deletions(-) create mode 100644 tests/integration/test_manifest_pruning_spec_evolution.py diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 68089beb54..0f31b4920f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -542,12 +542,70 @@ def dynamic_partition_overwrite( ) ) - partitions_to_overwrite = {data_file.partition for data_file in data_files} - delete_filter = self._build_partition_predicate( - partition_records=partitions_to_overwrite, spec=self.table_metadata.spec(), schema=self.table_metadata.schema() - ) - self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) + # partitions_to_overwrite = {data_file.partition for data_file in data_files} + # delete_filter = self._build_partition_predicate( + # partition_records=partitions_to_overwrite, spec=self.table_metadata.spec(), schema=self.table_metadata.schema() + # ) + # self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) + + # Build the partition predicate per-spec to handle tables that have + # undergone partition spec evolution. Manifests in the snapshot may be + # written under different (older) specs. We need to project the overwrite + # partitions into each historical spec's coordinate space so that the + # manifest evaluator correctly identifies which old manifests to delete. + # See: https://github.com/apache/iceberg-python/issues/XXXX + current_spec = self.table_metadata.spec() + current_schema = self.table_metadata.schema() + # Collect the source column names (e.g. "category") that are being + # overwritten — these are stable across spec evolution (only field IDs matter). + overwrite_source_ids = {field.source_id for field in current_spec.fields} + + delete_filter: BooleanExpression = AlwaysFalse() + + # For each historical spec in the snapshot, build a predicate using + # only the fields that spec knows about, matched against the + # corresponding positions in the new data files' partition records. + snapshot = self.table_metadata.snapshot_by_name(branch or MAIN_BRANCH) + if snapshot is not None: + spec_ids_in_snapshot = {m.partition_spec_id for m in snapshot.manifests(io=self._table.io)} + else: + spec_ids_in_snapshot = {current_spec.spec_id} + + for spec_id in spec_ids_in_snapshot: + historical_spec = self.table_metadata.specs()[spec_id] + # Find which fields this historical spec shares with the current spec + shared_source_ids = {f.source_id for f in historical_spec.fields} & overwrite_source_ids + if not shared_source_ids: + # No overlap — this spec's manifests cannot contain our partitions + continue + + # Project the new data files' partitions into this historical spec's space: + # for each new data file, build a partition record using only the + # fields this historical spec knows about. + historical_partitions: set[Record] = set() + for data_file in data_files: + # data_file.partition is under current_spec — extract shared field values + record_values = [] + for field in historical_spec.fields: + if field.source_id in overwrite_source_ids: + # find position of this source_id in current spec + current_pos = next(i for i, f in enumerate(current_spec.fields) if f.source_id == field.source_id) + record_values.append(data_file.partition[current_pos]) + else: + record_values.append(None) + historical_partitions.add(Record(*record_values)) + + delete_filter = Or( + delete_filter, + self._build_partition_predicate( + partition_records=historical_partitions, + spec=historical_spec, + schema=current_schema, + ), + ) + + self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: append_files.commit_uuid = append_snapshot_commit_uuid for data_file in data_files: diff --git a/tests/integration/test_manifest_pruning_spec_evolution.py b/tests/integration/test_manifest_pruning_spec_evolution.py new file mode 100644 index 0000000000..e7815d0794 --- /dev/null +++ b/tests/integration/test_manifest_pruning_spec_evolution.py @@ -0,0 +1,267 @@ +""" +Regression / investigation test for manifest pruning correctness under partition spec evolution. + +Context +------- +PR #3011 (merged Feb 20 2026) added manifest pruning to _OverwriteFiles and _DeleteFiles +in pyiceberg/table/update/snapshot.py. The pruning builds a partition predicate from the +*current* partition spec and evaluates it against every manifest in the snapshot via a +KeyDefaultDict of per-spec evaluators. + +The question this test file investigates: + When a table has been through partition spec evolution, its snapshot may contain manifests + written under *different* partition_spec_ids. Does the manifest evaluator correctly resolve + each manifest's own spec before deciding whether to include or skip it? + +If the answer is "no", the overwrite will silently skip manifests from the old spec, leaving +stale data files that should have been deleted -- a silent correctness bug. + +How to run +---------- + pytest tests/integration/test_manifest_pruning_spec_evolution.py -v +""" + +import tempfile +from typing import Any + +import pyarrow as pa + +from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import LongType, NestedField, StringType + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +SCHEMA = Schema( + NestedField(field_id=1, name="category", field_type=StringType(), required=False), + NestedField(field_id=2, name="region", field_type=StringType(), required=False), + NestedField(field_id=3, name="value", field_type=LongType(), required=False), +) + +# Spec 0: partitioned only by category +SPEC_V0 = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category")) + + +def make_catalog(warehouse: str) -> Catalog: + """Spin up a local SQLite-backed catalog -- no services needed.""" + return load_catalog( + "test", + type="sql", + uri=f"sqlite:///{warehouse}/catalog.db", + warehouse=f"file://{warehouse}", + ) + + +def arrow_table(rows: list[dict[str, Any]]) -> pa.Table: + return pa.Table.from_pylist( + rows, + schema=pa.schema( + [ + pa.field("category", pa.string()), + pa.field("region", pa.string()), + pa.field("value", pa.int64()), + ] + ), + ) + + +# --------------------------------------------------------------------------- +# Test 1: Mixed spec snapshot -- overwrite partition present in both specs +# --------------------------------------------------------------------------- + + +def test_overwrite_after_partition_spec_evolution_correctness() -> None: + """ + Verifies that dynamic_partition_overwrite correctly replaces ALL data files + for the target partition, including those written under a previous partition spec. + + Setup: + - Spec 0: partition by identity(category) + - Write A(1,2,3) and B(10,11) under spec 0 + - Evolve to spec 1: add identity(region) + - Write A(100,101) and B(200) under spec 1 + - Overwrite category=A with new rows (999, 888) + + Expected after overwrite: + - Only new A rows: values [888, 999] + - All B rows untouched: values [10, 11, 200] + - Total: 5 rows + + Bug (pre-fix): spec-0 A manifests are skipped by the evaluator, + leaving stale A rows (1, 2, 3) in the table -> 8 rows total. + """ + with tempfile.TemporaryDirectory() as warehouse: + catalog = make_catalog(warehouse) + catalog.create_namespace("default") + + # --- Step 1: create table with spec 0 --- + table = catalog.create_table( + "default.test_spec_evolution_overwrite", + schema=SCHEMA, + partition_spec=SPEC_V0, + ) + + # --- Step 2: write data under spec 0 --- + table.append( + arrow_table( + [ + {"category": "A", "region": None, "value": 1}, + {"category": "A", "region": None, "value": 2}, + {"category": "A", "region": None, "value": 3}, + {"category": "B", "region": None, "value": 10}, + {"category": "B", "region": None, "value": 11}, + ] + ) + ) + assert table.scan().to_arrow().num_rows == 5 + + # --- Step 3: evolve partition spec -- add identity(region) --- + with table.update_spec() as update: + update.add_field( + source_column_name="region", + transform=IdentityTransform(), + partition_field_name="region", + ) + table = catalog.load_table("default.test_spec_evolution_overwrite") + assert table.spec().spec_id == 1, f"Expected spec_id=1, got {table.spec().spec_id}" + + # --- Step 4: write data under spec 1 --- + table.append( + arrow_table( + [ + {"category": "A", "region": "us", "value": 100}, + {"category": "A", "region": "eu", "value": 101}, + {"category": "B", "region": "us", "value": 200}, + ] + ) + ) + assert table.scan().to_arrow().num_rows == 8 + + # Confirm mixed-spec snapshot is actually set up + current_snapshot = table.current_snapshot() + assert current_snapshot is not None + manifests = current_snapshot.manifests(table.io) + spec_ids_in_snapshot = {m.partition_spec_id for m in manifests} + assert len(spec_ids_in_snapshot) > 1, f"Test setup failed: expected manifests from >1 spec, got {spec_ids_in_snapshot}" + + # --- Step 5: dynamic_partition_overwrite for category=A only --- + table.dynamic_partition_overwrite( + arrow_table( + [ + {"category": "A", "region": "us", "value": 999}, + {"category": "A", "region": "eu", "value": 888}, + ] + ) + ) + + table = catalog.load_table("default.test_spec_evolution_overwrite") + result = table.scan().to_arrow().to_pydict() + + categories = result["category"] + values = result["value"] + + a_values = [v for c, v in zip(categories, values, strict=True) if c == "A"] + b_values = [v for c, v in zip(categories, values, strict=True) if c == "B"] + + # Total rows: 2 new A + 3 B = 5 + assert len(a_values) + len(b_values) == 5, ( + f"Row count mismatch: expected 5, got {len(a_values) + len(b_values)}.\n" + f"A values: {sorted(a_values)} -- stale values would be any of [1, 2, 3, 100, 101]\n" + f"B values: {sorted(b_values)}" + ) + + # A rows must be only the new ones + stale = [v for v in a_values if v in (1, 2, 3, 100, 101)] + assert not stale, ( + f"Stale A rows found (should have been deleted): {stale}\n" + f"spec-0 manifests were incorrectly skipped during manifest pruning." + ) + assert sorted(a_values) == [888, 999], f"Expected A=[888,999], got {sorted(a_values)}" + + # B rows completely untouched + assert sorted(b_values) == [10, 11, 200], f"Expected B=[10,11,200], got {sorted(b_values)}" + + +# --------------------------------------------------------------------------- +# Test 2: Overwrite partition that ONLY exists in spec-0 manifests +# This is the most dangerous case -- silent data duplication, no exception raised +# --------------------------------------------------------------------------- + + +def test_overwrite_partition_only_in_old_spec() -> None: + """ + Sharpest form of the bug: the overwrite target (category=B) has data + ONLY under spec-0. After spec evolution to spec-1, overwriting B should + delete the old spec-0 B files and write new ones. + + Bug (pre-fix): the manifest evaluator, built against spec-1's predicate, + finds zero matching manifests for B (because B only exists in spec-0 + manifests) -> UserWarning "did not match any records" -> old B rows survive + -> silent data duplication: [999, 10, 11] instead of [999]. + """ + with tempfile.TemporaryDirectory() as warehouse: + catalog = make_catalog(warehouse) + catalog.create_namespace("default") + + table = catalog.create_table( + "default.test_old_spec_only_overwrite", + schema=SCHEMA, + partition_spec=SPEC_V0, + ) + + # Write ONLY category=B under spec 0 + table.append( + arrow_table( + [ + {"category": "B", "region": None, "value": 10}, + {"category": "B", "region": None, "value": 11}, + ] + ) + ) + + # Evolve spec -- add identity(region) + with table.update_spec() as update: + update.add_field( + source_column_name="region", + transform=IdentityTransform(), + partition_field_name="region", + ) + table = catalog.load_table("default.test_old_spec_only_overwrite") + + # Write ONLY category=A under spec 1 (B has no spec-1 data) + table.append( + arrow_table( + [ + {"category": "A", "region": "us", "value": 100}, + ] + ) + ) + + # Overwrite category=B -- it only exists in spec-0 manifests + table.dynamic_partition_overwrite( + arrow_table( + [ + {"category": "B", "region": "us", "value": 999}, + ] + ) + ) + + table = catalog.load_table("default.test_old_spec_only_overwrite") + result = table.scan().to_arrow().to_pydict() + + categories = result["category"] + values = result["value"] + + b_values = [v for c, v in zip(categories, values, strict=True) if c == "B"] + a_values = [v for c, v in zip(categories, values, strict=True) if c == "A"] + + assert b_values == [999], ( + f"Expected B=[999] only, got {b_values}.\n" + f"Stale rows {[v for v in b_values if v != 999]} were not deleted -- " + f"spec-0 manifests were incorrectly skipped." + ) + assert a_values == [100], f"A data unexpectedly modified: {a_values}" From 1931bcb5d593b268480be085a3b3ac99ae2012fc Mon Sep 17 00:00:00 2001 From: Tushar Choudhary <151359025+tusharchou@users.noreply.github.com> Date: Sun, 15 Mar 2026 17:35:30 +0530 Subject: [PATCH 2/3] fix: add Apache license header to test file --- .../test_manifest_pruning_spec_evolution.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/integration/test_manifest_pruning_spec_evolution.py b/tests/integration/test_manifest_pruning_spec_evolution.py index e7815d0794..d836e5be4a 100644 --- a/tests/integration/test_manifest_pruning_spec_evolution.py +++ b/tests/integration/test_manifest_pruning_spec_evolution.py @@ -1,3 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. """ Regression / investigation test for manifest pruning correctness under partition spec evolution. From 346387b7af7c4c9d89ee840ece2cdaa0784aa718 Mon Sep 17 00:00:00 2001 From: Tushar Choudhary <151359025+tusharchou@users.noreply.github.com> Date: Sun, 15 Mar 2026 21:44:04 +0530 Subject: [PATCH 3/3] fix inline comment --- pyiceberg/table/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0f31b4920f..4c624ff4e3 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -553,7 +553,6 @@ def dynamic_partition_overwrite( # written under different (older) specs. We need to project the overwrite # partitions into each historical spec's coordinate space so that the # manifest evaluator correctly identifies which old manifests to delete. - # See: https://github.com/apache/iceberg-python/issues/XXXX current_spec = self.table_metadata.spec() current_schema = self.table_metadata.schema()