Skip to content

feat: Add table.maintenance.compact() for full-table data file compaction#3124

Open
qzyu999 wants to merge 7 commits intoapache:mainfrom
qzyu999:feat-compaction-issue-1092
Open

feat: Add table.maintenance.compact() for full-table data file compaction#3124
qzyu999 wants to merge 7 commits intoapache:mainfrom
qzyu999:feat-compaction-issue-1092

Conversation

@qzyu999
Copy link

@qzyu999 qzyu999 commented Mar 6, 2026

Closes #1092

Rationale for this change

This introduces a simplified, whole-table compaction strategy via the MaintenanceTable API (table.maintenance.compact()).

Key implementation details:

  • Reads the entire table state into memory via .to_arrow().
    • Note: This initial implementation uses an in-memory Arrow-based rewrite strategy. Future iterations can extend this to support streaming or distributed rewrites for larger-than-memory datasets.
  • Uses table.overwrite() to rewrite data, leveraging PyIceberg's target file bin-packing (write.target-file-size-bytes) natively.
  • Ensures atomicity by executing within a table transaction.
  • Explicitly sets snapshot-type: replace and replace-operation: compaction to ensure correct metadata history for downstream engines.
  • Includes a guard to safely ignore compaction requests on empty tables.

Are these changes tested?

Includes full Pytest coverage in tests/table/test_maintenance.py.

Are there any user-facing changes?

Yes. This PR adds a new compact() method to the TableMaintenance API, allowing users to perform file compaction on existing Iceberg tables.

Example usage:

table = catalog.load_table("default.my_table")
# Merges small files into larger ones based on table properties
table.maintenance.compact()

Edit: It looks like I'm not able to add the changelog label, hopefully someone with permissions can do so.

qzyu999 added 2 commits March 5, 2026 21:32
This introduces a simplified, whole-table compaction strategy via the
MaintenanceTable API (`table.maintenance.compact()`).

Key implementation details:
- Reads the entire table state into memory via `.to_arrow()`.
- Uses `table.overwrite()` to rewrite data, leveraging PyIceberg's
  target file bin-packing (`write.target-file-size-bytes`) natively.
- Ensures atomicity by executing within a table transaction.
- Explicitly sets `snapshot-type: replace` and `replace-operation: compaction`
  to ensure correct metadata history for downstream engines.
- Includes a guard to safely ignore compaction requests on empty tables.

Includes full Pytest coverage in `tests/table/test_maintenance.py`.
Closes apache#1092

# Overwrite the table atomically (REPLACE operation)
with self.tbl.transaction() as txn:
txn.overwrite(arrow_table, snapshot_properties={"snapshot-type": "replace", "replace-operation": "compaction"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should have a replace operation instead
https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/DataOperations.html#REPLACE

we might want to create the .replace() first

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu, thanks for the insight, I agree with what you're saying in terms of building a replace rather than just reusing the overwrite. I've refactored the compaction run to properly use a .replace() API, following the design of the Java Iceberg implementation.

The approach is to create a new _RewriteFiles in pyiceberg/table/update/snapshot.py, which utilizes the new Operation.REPLACE from pyiceberg/table/update/snapshots.py. The _RewriteFiles utilizes the replace(), which effectively mimics the _OverwriteFiles operation, with the exception that it uses Operation.REPLACE instead of Operation.OVERWRITE. This allows MaintenanceTable.compact() to do a proper txn.replace() rather than reuse txn.overwrite().

I also think it's worth noting that by adding Operation.REPLACE, we make room for the needed rewrite manifests (#270) and delete orphan files functionality (#1200).

after_files = list(table.scan().plan_files())
assert len(after_files) == 3 # Should be 1 optimized data file per partition
assert table.scan().to_arrow().num_rows == 120

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since its a small result set, we should verify the data is the same too

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu, made a change in 6420027 to check that the columns and the primary keys remain the same before/after compaction.

return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True))

def compact(self) -> None:
"""Compact the table's data files by reading and overwriting the entire table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be data and delete files. but generally it compacts the entire table

Copy link
Author

@qzyu999 qzyu999 Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu, made the update to the docstring here: 9fd51a8.

qzyu999 added 5 commits March 6, 2026 12:22
Formats the [compact](iceberg-python/pyiceberg/table/maintenance.py) method docstring to ensure the summary line does not wrap and correctly ends with a period, satisfying pydocstyle D205 and D400 rules.
Replaces the use of .overwrite() in MaintenanceTable.compact() with a new .replace() API backed by a _RewriteFiles producer. This ensures compaction now generates an Operation.REPLACE snapshot instead of Operation.OVERWRITE, preserving logical table state for downstream consumers.

Fixes apache#1092
for data_file in data_files:
append_files.append_data_file(data_file)

def replace(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add replace on its own since its a pretty significant change and follow up with table compaction.

i think there are a few more things we need to add to the replace operation. Would be a good idea to look into the java side. For example, how can we ensure that the table's data remains the same? REPLACE means no data change. If we cannot guarantee that the data remains the same, maybe we should not expose a replace function that takes a df as a parameter

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu, I created an issue (#3130) and a corresponding PR (#3131) to address the need to create a separate PR for replace. When approved, we can use that to build and complete this current PR for compaction. We can move this discussion to there and come back when finished.

@EnyMan
Copy link

EnyMan commented Mar 19, 2026

I have been working on similar functionality for a while as part of my upsert optimization efforts. https://github.com/EnyMan/iceberg-python/blob/rewrite-data-files/pyiceberg/table/maintenance.py#L47, we had used it extensively in our production environment. (10K+ rewrites) It should be basically a clone of the Java version, and I was planning on creating a PR, but I never got to it until now, and now I see there is already some work being done on it. But i use Operation.OVERWRITE operation instead of replace.

@qzyu999
Copy link
Author

qzyu999 commented Mar 19, 2026

I have been working on similar functionality for a while as part of my upsert optimization efforts. https://github.com/EnyMan/iceberg-python/blob/rewrite-data-files/pyiceberg/table/maintenance.py#L47, we had used it extensively in our production environment. (10K+ rewrites) It should be basically a clone of the Java version, and I was planning on creating a PR, but I never got to it until now, and now I see there is already some work being done on it. But i use Operation.OVERWRITE operation instead of replace.

Hi @EnyMan, thanks for sharing your work! I took a look at your code, IIUC it seems that it's taking the new files and adding them and getting the old files and deleting them, an Operation.OVERWRITE as you mentioned. I had done something similarly in the beginning, but I now believe there is a flaw to that from the Java perspective:

  • OVERWRITE means new data is added to overwrite existing data
  • REPLACE means files are moved and replaced without changing the data in the table

This has impacts for time travel and conflict resolution.

  • If a snapshot is marked as REPLACE, the reader knows that the underlying files were strictly restructured (e.g., compacted from 10 small files to 1 large file) but no new logical records were inserted, updated, or deleted. The reader can safely ignore this snapshot.
  • If you use OVERWRITE for a compaction job, downstream processes may incorrectly perceive the compacted files as new data, potentially leading to duplicate processing.
  • During optimistic concurrency control, Iceberg uses the operation type to determine if two concurrent commits conflict. Because REPLACE strictly promises no logical changes, Iceberg's commit protocol can often safely re-apply a REPLACE operation alongside other concurrent data modifications (provided the specific files being replaced haven't been deleted).

For reasons that I believe are related to the above examples, @kevinjqliu requested we first implement the Operation.REPLACE functionality (#3130, #3131), and then come back to this issue/PR and complete the redesign. I saw that your code seems to have lots of those additional features that exist in Java's compaction function. As mentioned in #1092, the initial version of PyIceberg's can first start with the basic harness and iterate towards the level of completion that your implementation has in future issues/PR's. Following this logic, I believe once #3130 and #1092 are completed, your code would be quite valuable for quickly implementing compaction and adding those additional features to PyIceberg.

  • Insights were assisted with AI

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support data files compaction

3 participants