-
Notifications
You must be signed in to change notification settings - Fork 99
feat: Add file cleanup for expire snapshots #592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,11 +23,17 @@ | |
| #include <cstdint> | ||
| #include <iterator> | ||
| #include <memory> | ||
| #include <optional> | ||
| #include <string> | ||
| #include <unordered_set> | ||
| #include <vector> | ||
|
|
||
| #include "iceberg/file_io.h" | ||
| #include "iceberg/manifest/manifest_entry.h" | ||
| #include "iceberg/manifest/manifest_reader.h" | ||
| #include "iceberg/schema.h" | ||
| #include "iceberg/snapshot.h" | ||
| #include "iceberg/statistics_file.h" | ||
| #include "iceberg/table.h" | ||
| #include "iceberg/table_metadata.h" | ||
| #include "iceberg/transaction.h" | ||
|
|
@@ -285,7 +291,247 @@ Result<ExpireSnapshots::ApplyResult> ExpireSnapshots::Apply() { | |
| }); | ||
| } | ||
|
|
||
| // Cache the result for use during Finalize() | ||
| apply_result_ = result; | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) { | ||
| if (commit_error.has_value()) { | ||
| return {}; | ||
| } | ||
|
|
||
| if (cleanup_level_ == CleanupLevel::kNone) { | ||
| return {}; | ||
| } | ||
|
|
||
| if (!apply_result_.has_value() || apply_result_->snapshot_ids_to_remove.empty()) { | ||
| return {}; | ||
| } | ||
|
|
||
| // File cleanup is best-effort: log and continue on individual file deletion failures | ||
| // to avoid blocking metadata updates (matching Java behavior). | ||
| return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove); | ||
| } | ||
|
|
||
| void ExpireSnapshots::DeleteFilePath(const std::string& path) { | ||
| try { | ||
| if (delete_func_) { | ||
| delete_func_(path); | ||
| } else { | ||
| auto status = ctx_->table->io()->DeleteFile(path); | ||
| // Best-effort: ignore NotFound (file already deleted) and other errors. | ||
| // Java uses suppressFailureWhenFinished + onFailure logging. | ||
| std::ignore = status; | ||
| } | ||
| } catch (...) { | ||
| // Suppress all exceptions during file cleanup to match Java's | ||
| // suppressFailureWhenFinished behavior. | ||
| } | ||
| } | ||
|
|
||
| Status ExpireSnapshots::ReadManifestsForSnapshot( | ||
| int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) { | ||
| const TableMetadata& metadata = base(); | ||
| auto file_io = ctx_->table->io(); | ||
|
|
||
| auto snapshot_result = metadata.SnapshotById(snapshot_id); | ||
| if (!snapshot_result.has_value()) { | ||
| return {}; | ||
| } | ||
| auto& snapshot = snapshot_result.value(); | ||
|
|
||
| SnapshotCache snapshot_cache(snapshot.get()); | ||
| auto manifests_result = snapshot_cache.Manifests(file_io); | ||
| if (!manifests_result.has_value()) { | ||
| // Best-effort: skip this snapshot if we can't read its manifests | ||
| return {}; | ||
| } | ||
|
|
||
| for (const auto& manifest : manifests_result.value()) { | ||
| manifest_paths.insert(manifest.manifest_path); | ||
| } | ||
|
|
||
| return {}; | ||
| } | ||
|
|
||
| Status ExpireSnapshots::FindDataFilesToDelete( | ||
| const std::unordered_set<std::string>& manifests_to_delete, | ||
| const std::unordered_set<std::string>& retained_manifests, | ||
| std::unordered_set<std::string>& data_files_to_delete) { | ||
| const TableMetadata& metadata = base(); | ||
| auto file_io = ctx_->table->io(); | ||
|
|
||
| // Step 1: Collect all file paths from manifests being deleted | ||
| for (const auto& manifest_path : manifests_to_delete) { | ||
| // Find the ManifestFile for this path by scanning expired snapshots | ||
| for (const auto& snapshot : metadata.snapshots) { | ||
| if (!snapshot) continue; | ||
| SnapshotCache snapshot_cache(snapshot.get()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Performance / Logic Issue: Instantiating Suggestion: Consider collecting |
||
| auto manifests_result = snapshot_cache.Manifests(file_io); | ||
| if (!manifests_result.has_value()) continue; | ||
|
|
||
| for (const auto& manifest : manifests_result.value()) { | ||
| if (manifest.manifest_path != manifest_path) continue; | ||
|
|
||
| auto schema_result = metadata.Schema(); | ||
| if (!schema_result.has_value()) continue; | ||
| auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); | ||
| if (!spec_result.has_value()) continue; | ||
|
|
||
| auto reader_result = ManifestReader::Make( | ||
| manifest, file_io, schema_result.value(), spec_result.value()); | ||
| if (!reader_result.has_value()) continue; | ||
|
|
||
| auto entries_result = reader_result.value()->Entries(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parity / Logic Issue: C++ uses Details: If a data file is added in an expired snapshot and deleted in a retained snapshot, using Suggestion: Change |
||
| if (!entries_result.has_value()) continue; | ||
|
|
||
| for (const auto& entry : entries_result.value()) { | ||
| if (entry.data_file) { | ||
| data_files_to_delete.insert(entry.data_file->file_path); | ||
| } | ||
| } | ||
| goto next_manifest; // Found and processed this manifest, move to next | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. C++ Style Issue: The use of |
||
| } | ||
| } | ||
| next_manifest:; | ||
| } | ||
|
|
||
| if (data_files_to_delete.empty()) { | ||
| return {}; | ||
| } | ||
|
|
||
| // Step 2: Remove any files that are still referenced by retained manifests. | ||
| // This ensures we don't delete files that are shared across manifests. | ||
| for (const auto& manifest_path : retained_manifests) { | ||
| if (data_files_to_delete.empty()) break; | ||
|
|
||
| for (const auto& snapshot : metadata.snapshots) { | ||
| if (!snapshot) continue; | ||
| SnapshotCache snapshot_cache(snapshot.get()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Performance / Logic Issue: Similar to line 371, instantiating |
||
| auto manifests_result = snapshot_cache.Manifests(file_io); | ||
| if (!manifests_result.has_value()) continue; | ||
|
|
||
| for (const auto& manifest : manifests_result.value()) { | ||
| if (manifest.manifest_path != manifest_path) continue; | ||
|
|
||
| auto schema_result = metadata.Schema(); | ||
| if (!schema_result.has_value()) continue; | ||
| auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); | ||
| if (!spec_result.has_value()) continue; | ||
|
|
||
| auto reader_result = ManifestReader::Make( | ||
| manifest, file_io, schema_result.value(), spec_result.value()); | ||
| if (!reader_result.has_value()) continue; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistent Behavior (Data Loss Risk): If reading a retained manifest fails, the C++ implementation silently ignores it ( |
||
|
|
||
| auto entries_result = reader_result.value()->Entries(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parity / Logic Issue: Similar to line 387, use |
||
| if (!entries_result.has_value()) continue; | ||
|
|
||
| for (const auto& entry : entries_result.value()) { | ||
| if (entry.data_file) { | ||
| data_files_to_delete.erase(entry.data_file->file_path); | ||
| } | ||
| } | ||
| goto next_retained; | ||
| } | ||
| } | ||
| next_retained:; | ||
| } | ||
|
|
||
| return {}; | ||
| } | ||
|
|
||
| Status ExpireSnapshots::CleanExpiredFiles( | ||
| const std::vector<int64_t>& expired_snapshot_ids) { | ||
| const TableMetadata& metadata = base(); | ||
|
|
||
| // Build expired and retained snapshot ID sets. | ||
| // The retained set includes ALL snapshots referenced by any branch or tag, | ||
| // since Apply() already computed retention across all refs. | ||
| std::unordered_set<int64_t> expired_id_set(expired_snapshot_ids.begin(), | ||
| expired_snapshot_ids.end()); | ||
| std::unordered_set<int64_t> retained_snapshot_ids; | ||
| for (const auto& snapshot : metadata.snapshots) { | ||
| if (snapshot && !expired_id_set.contains(snapshot->snapshot_id)) { | ||
| retained_snapshot_ids.insert(snapshot->snapshot_id); | ||
| } | ||
| } | ||
|
|
||
| // Phase 1: Collect manifest paths from expired and retained snapshots. | ||
| // TODO(shangxinli): Parallelize manifest collection with a thread pool. | ||
| std::unordered_set<std::string> expired_manifest_paths; | ||
| for (int64_t snapshot_id : expired_snapshot_ids) { | ||
| std::ignore = ReadManifestsForSnapshot(snapshot_id, expired_manifest_paths); | ||
| } | ||
|
|
||
| std::unordered_set<std::string> retained_manifest_paths; | ||
| for (int64_t snapshot_id : retained_snapshot_ids) { | ||
| std::ignore = ReadManifestsForSnapshot(snapshot_id, retained_manifest_paths); | ||
| } | ||
|
|
||
| // Phase 2: Prune manifests still referenced by retained snapshots. | ||
| // Only manifests exclusively in expired snapshots should be deleted. | ||
| std::unordered_set<std::string> manifests_to_delete; | ||
| for (const auto& path : expired_manifest_paths) { | ||
| if (!retained_manifest_paths.contains(path)) { | ||
| manifests_to_delete.insert(path); | ||
| } | ||
| } | ||
|
|
||
| // Phase 3: If cleanup level is kAll, find data files to delete. | ||
| // Only read entries from manifests being deleted (not all expired manifests), | ||
| // then subtract any files still reachable from retained manifests. | ||
| if (cleanup_level_ == CleanupLevel::kAll && !manifests_to_delete.empty()) { | ||
| std::unordered_set<std::string> data_files_to_delete; | ||
| std::ignore = FindDataFilesToDelete(manifests_to_delete, retained_manifest_paths, | ||
| data_files_to_delete); | ||
|
|
||
| // TODO(shangxinli): Parallelize file deletion with a thread pool. | ||
| for (const auto& path : data_files_to_delete) { | ||
| DeleteFilePath(path); | ||
| } | ||
| } | ||
|
|
||
| // Phase 4: Delete orphaned manifest files. | ||
| for (const auto& path : manifests_to_delete) { | ||
| DeleteFilePath(path); | ||
| } | ||
|
|
||
| // Phase 5: Delete manifest lists from expired snapshots. | ||
| for (int64_t snapshot_id : expired_snapshot_ids) { | ||
| auto snapshot_result = metadata.SnapshotById(snapshot_id); | ||
| if (!snapshot_result.has_value()) continue; | ||
| auto& snapshot = snapshot_result.value(); | ||
| if (!snapshot->manifest_list.empty()) { | ||
| DeleteFilePath(snapshot->manifest_list); | ||
| } | ||
| } | ||
|
|
||
| // Phase 6: Delete expired statistics files. | ||
| // Use set difference between before and after states (matching Java behavior). | ||
| // Since Finalize runs before table_ is updated, "after" is base() minus expired. | ||
| std::unordered_set<int64_t> retained_stats_snapshots(retained_snapshot_ids); | ||
| for (const auto& stat_file : metadata.statistics) { | ||
| if (stat_file && !retained_stats_snapshots.contains(stat_file->snapshot_id)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistent Behavior: C++ deletes statistics files by checking if the |
||
| DeleteFilePath(stat_file->path); | ||
| } | ||
| } | ||
| for (const auto& part_stat : metadata.partition_statistics) { | ||
| if (part_stat && !retained_stats_snapshots.contains(part_stat->snapshot_id)) { | ||
| DeleteFilePath(part_stat->path); | ||
| } | ||
| } | ||
|
|
||
| return {}; | ||
| } | ||
|
|
||
| // TODO(shangxinli): Implement IncrementalFileCleanup strategy for linear ancestry | ||
| // optimization. Java uses this when: !specifiedSnapshotId && simple linear main branch | ||
| // ancestry (no non-main snapshots removed, no non-main snapshots remain). | ||
| // The incremental strategy is more efficient because it only needs to scan | ||
| // manifests written by expired snapshots (checking added_snapshot_id), avoiding | ||
| // the full reachability analysis. It also handles cherry-pick protection via | ||
| // SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP. | ||
|
|
||
| } // namespace iceberg | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
C++ Style Issue: Using mutable out-parameters (
std::unordered_set<std::string>& data_files_to_delete) overResultis a less modern C++ pattern. Consider returningResult<std::unordered_set<std::string>>.