Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions src/iceberg/test/expire_snapshots_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#include "iceberg/update/expire_snapshots.h"

#include <string>
#include <vector>

#include "iceberg/test/matchers.h"
#include "iceberg/test/update_test_base.h"

Expand Down Expand Up @@ -65,4 +68,88 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
}
}

TEST_F(ExpireSnapshotsTest, DeleteWithCustomFunction) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

// Apply first so apply_result_ is cached
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);

// Call Finalize directly to simulate successful commit
// Note: Finalize tries to read manifests from the expired snapshot's manifest list,
// which will fail on mock FS since "s3://a/b/1.avro" doesn't contain real avro data.
// The error is returned from Finalize but in the real commit flow it's ignored.
auto finalize_status = update->Finalize(std::nullopt);
// Finalize may fail because manifest list files don't exist on mock FS,
// but it should not crash
if (finalize_status.has_value()) {
// If it succeeded (e.g., if manifest reading was skipped), verify deletions
EXPECT_FALSE(deleted_files.empty());
}
}

TEST_F(ExpireSnapshotsTest, CleanupLevelNoneSkipsFileDeletion) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->CleanupLevel(CleanupLevel::kNone);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);

// With kNone cleanup level, Finalize should skip all file deletion
auto finalize_status = update->Finalize(std::nullopt);
EXPECT_THAT(finalize_status, IsOk());
EXPECT_TRUE(deleted_files.empty());
}

TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);

// Simulate a commit failure - Finalize should not delete any files
auto finalize_status = update->Finalize(
Error{.kind = ErrorKind::kCommitFailed, .message = "simulated failure"});
EXPECT_THAT(finalize_status, IsOk());
EXPECT_TRUE(deleted_files.empty());
}

TEST_F(ExpireSnapshotsTest, FinalizeSkippedWhenNoSnapshotsExpired) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->RetainLast(2);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_TRUE(result.snapshot_ids_to_remove.empty());

// No snapshots expired, so Finalize should not delete any files
auto finalize_status = update->Finalize(std::nullopt);
EXPECT_THAT(finalize_status, IsOk());
EXPECT_TRUE(deleted_files.empty());
}

TEST_F(ExpireSnapshotsTest, CommitWithCleanupLevelNone) {
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->CleanupLevel(CleanupLevel::kNone);

// Commit should succeed - Finalize is called internally but skips cleanup
EXPECT_THAT(update->Commit(), IsOk());

// Verify snapshot was removed from metadata
auto metadata = ReloadMetadata();
EXPECT_EQ(metadata->snapshots.size(), 1);
EXPECT_EQ(metadata->snapshots.at(0)->snapshot_id, 3055729675574597004);
}

} // namespace iceberg
246 changes: 246 additions & 0 deletions src/iceberg/update/expire_snapshots.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@
#include <cstdint>
#include <iterator>
#include <memory>
#include <optional>
#include <string>
#include <unordered_set>
#include <vector>

#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/statistics_file.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/transaction.h"
Expand Down Expand Up @@ -285,7 +291,247 @@ Result<ExpireSnapshots::ApplyResult> ExpireSnapshots::Apply() {
});
}

// Cache the result for use during Finalize()
apply_result_ = result;

return result;
}

Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
if (commit_error.has_value()) {
return {};
}

if (cleanup_level_ == CleanupLevel::kNone) {
return {};
}

if (!apply_result_.has_value() || apply_result_->snapshot_ids_to_remove.empty()) {
return {};
}

// File cleanup is best-effort: log and continue on individual file deletion failures
// to avoid blocking metadata updates (matching Java behavior).
return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
}

void ExpireSnapshots::DeleteFilePath(const std::string& path) {
try {
if (delete_func_) {
delete_func_(path);
} else {
auto status = ctx_->table->io()->DeleteFile(path);
// Best-effort: ignore NotFound (file already deleted) and other errors.
// Java uses suppressFailureWhenFinished + onFailure logging.
std::ignore = status;
}
} catch (...) {
// Suppress all exceptions during file cleanup to match Java's
// suppressFailureWhenFinished behavior.
}
}

Status ExpireSnapshots::ReadManifestsForSnapshot(
int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) {
const TableMetadata& metadata = base();
auto file_io = ctx_->table->io();

auto snapshot_result = metadata.SnapshotById(snapshot_id);
if (!snapshot_result.has_value()) {
return {};
}
auto& snapshot = snapshot_result.value();

SnapshotCache snapshot_cache(snapshot.get());
auto manifests_result = snapshot_cache.Manifests(file_io);
if (!manifests_result.has_value()) {
// Best-effort: skip this snapshot if we can't read its manifests
return {};
}

for (const auto& manifest : manifests_result.value()) {
manifest_paths.insert(manifest.manifest_path);
}

return {};
}

Status ExpireSnapshots::FindDataFilesToDelete(
const std::unordered_set<std::string>& manifests_to_delete,
const std::unordered_set<std::string>& retained_manifests,
std::unordered_set<std::string>& data_files_to_delete) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++ Style Issue: Using mutable out-parameters (std::unordered_set<std::string>& data_files_to_delete) over Result is a less modern C++ pattern. Consider returning Result<std::unordered_set<std::string>>.

const TableMetadata& metadata = base();
auto file_io = ctx_->table->io();

// Step 1: Collect all file paths from manifests being deleted
for (const auto& manifest_path : manifests_to_delete) {
// Find the ManifestFile for this path by scanning expired snapshots
for (const auto& snapshot : metadata.snapshots) {
if (!snapshot) continue;
SnapshotCache snapshot_cache(snapshot.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance / Logic Issue: Instantiating SnapshotCache locally inside the nested loop causes O(M*S) I/O complexity. The cache is not preserved across loop iterations, meaning for every manifest path being processed, the manifest list of every snapshot is repeatedly downloaded and parsed from storage.

Suggestion: Consider collecting ManifestFile objects in Phase 1 (similar to Java's Set<ManifestFile> deletionCandidates) or pre-caching them in a map to avoid redundant I/O.

auto manifests_result = snapshot_cache.Manifests(file_io);
if (!manifests_result.has_value()) continue;

for (const auto& manifest : manifests_result.value()) {
if (manifest.manifest_path != manifest_path) continue;

auto schema_result = metadata.Schema();
if (!schema_result.has_value()) continue;
auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id);
if (!spec_result.has_value()) continue;

auto reader_result = ManifestReader::Make(
manifest, file_io, schema_result.value(), spec_result.value());
if (!reader_result.has_value()) continue;

auto entries_result = reader_result.value()->Entries();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parity / Logic Issue: C++ uses Entries() which includes DELETED entries, whereas Java uses ManifestFiles.readPaths which delegates to liveEntries() (only ADDED and EXISTING).

Details: If a data file is added in an expired snapshot and deleted in a retained snapshot, using Entries() will read the DELETED entry in Phase 2 and subtract the data file from data_files_to_delete. This prevents the physical file from being deleted, resulting in a storage leak.

Suggestion: Change reader_result.value()->Entries() to reader_result.value()->LiveEntries() to strictly match Java's behavior.

if (!entries_result.has_value()) continue;

for (const auto& entry : entries_result.value()) {
if (entry.data_file) {
data_files_to_delete.insert(entry.data_file->file_path);
}
}
goto next_manifest; // Found and processed this manifest, move to next
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++ Style Issue: The use of goto next_manifest; to break out of nested loops is a non-idiomatic C++ anti-pattern. Consider moving this manifest lookup logic into a helper function (e.g., std::optional<ManifestFile> GetManifestByPath(path)).

}
}
next_manifest:;
}

if (data_files_to_delete.empty()) {
return {};
}

// Step 2: Remove any files that are still referenced by retained manifests.
// This ensures we don't delete files that are shared across manifests.
for (const auto& manifest_path : retained_manifests) {
if (data_files_to_delete.empty()) break;

for (const auto& snapshot : metadata.snapshots) {
if (!snapshot) continue;
SnapshotCache snapshot_cache(snapshot.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance / Logic Issue: Similar to line 371, instantiating SnapshotCache here inside the loop repeats the O(M*S) I/O complexity.

auto manifests_result = snapshot_cache.Manifests(file_io);
if (!manifests_result.has_value()) continue;

for (const auto& manifest : manifests_result.value()) {
if (manifest.manifest_path != manifest_path) continue;

auto schema_result = metadata.Schema();
if (!schema_result.has_value()) continue;
auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id);
if (!spec_result.has_value()) continue;

auto reader_result = ManifestReader::Make(
manifest, file_io, schema_result.value(), spec_result.value());
if (!reader_result.has_value()) continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Behavior (Data Loss Risk): If reading a retained manifest fails, the C++ implementation silently ignores it (continue). Java uses .retry(3) and .throwFailureWhenFinished(). If we silently ignore a read failure here, we will fail to subtract its live data files from data_files_to_delete, resulting in accidental data loss (deleting a physical file that is still actively used). Failures here should abort the deletion of those specific data files.


auto entries_result = reader_result.value()->Entries();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parity / Logic Issue: Similar to line 387, use LiveEntries() here instead of Entries() to avoid a potential storage leak.

if (!entries_result.has_value()) continue;

for (const auto& entry : entries_result.value()) {
if (entry.data_file) {
data_files_to_delete.erase(entry.data_file->file_path);
}
}
goto next_retained;
}
}
next_retained:;
}

return {};
}

Status ExpireSnapshots::CleanExpiredFiles(
const std::vector<int64_t>& expired_snapshot_ids) {
const TableMetadata& metadata = base();

// Build expired and retained snapshot ID sets.
// The retained set includes ALL snapshots referenced by any branch or tag,
// since Apply() already computed retention across all refs.
std::unordered_set<int64_t> expired_id_set(expired_snapshot_ids.begin(),
expired_snapshot_ids.end());
std::unordered_set<int64_t> retained_snapshot_ids;
for (const auto& snapshot : metadata.snapshots) {
if (snapshot && !expired_id_set.contains(snapshot->snapshot_id)) {
retained_snapshot_ids.insert(snapshot->snapshot_id);
}
}

// Phase 1: Collect manifest paths from expired and retained snapshots.
// TODO(shangxinli): Parallelize manifest collection with a thread pool.
std::unordered_set<std::string> expired_manifest_paths;
for (int64_t snapshot_id : expired_snapshot_ids) {
std::ignore = ReadManifestsForSnapshot(snapshot_id, expired_manifest_paths);
}

std::unordered_set<std::string> retained_manifest_paths;
for (int64_t snapshot_id : retained_snapshot_ids) {
std::ignore = ReadManifestsForSnapshot(snapshot_id, retained_manifest_paths);
}

// Phase 2: Prune manifests still referenced by retained snapshots.
// Only manifests exclusively in expired snapshots should be deleted.
std::unordered_set<std::string> manifests_to_delete;
for (const auto& path : expired_manifest_paths) {
if (!retained_manifest_paths.contains(path)) {
manifests_to_delete.insert(path);
}
}

// Phase 3: If cleanup level is kAll, find data files to delete.
// Only read entries from manifests being deleted (not all expired manifests),
// then subtract any files still reachable from retained manifests.
if (cleanup_level_ == CleanupLevel::kAll && !manifests_to_delete.empty()) {
std::unordered_set<std::string> data_files_to_delete;
std::ignore = FindDataFilesToDelete(manifests_to_delete, retained_manifest_paths,
data_files_to_delete);

// TODO(shangxinli): Parallelize file deletion with a thread pool.
for (const auto& path : data_files_to_delete) {
DeleteFilePath(path);
}
}

// Phase 4: Delete orphaned manifest files.
for (const auto& path : manifests_to_delete) {
DeleteFilePath(path);
}

// Phase 5: Delete manifest lists from expired snapshots.
for (int64_t snapshot_id : expired_snapshot_ids) {
auto snapshot_result = metadata.SnapshotById(snapshot_id);
if (!snapshot_result.has_value()) continue;
auto& snapshot = snapshot_result.value();
if (!snapshot->manifest_list.empty()) {
DeleteFilePath(snapshot->manifest_list);
}
}

// Phase 6: Delete expired statistics files.
// Use set difference between before and after states (matching Java behavior).
// Since Finalize runs before table_ is updated, "after" is base() minus expired.
std::unordered_set<int64_t> retained_stats_snapshots(retained_snapshot_ids);
for (const auto& stat_file : metadata.statistics) {
if (stat_file && !retained_stats_snapshots.contains(stat_file->snapshot_id)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Behavior: C++ deletes statistics files by checking if the snapshot_id of the StatisticsFile is in the retained_stats_snapshots set. Java computes the set difference of actual file paths. If a statistics file path is referenced by multiple snapshots, deleting purely based on the expiration of a specific snapshot_id could erroneously delete a physical file that is still referenced by a newer, retained snapshot. Diff paths, not just snapshot IDs.

DeleteFilePath(stat_file->path);
}
}
for (const auto& part_stat : metadata.partition_statistics) {
if (part_stat && !retained_stats_snapshots.contains(part_stat->snapshot_id)) {
DeleteFilePath(part_stat->path);
}
}

return {};
}

// TODO(shangxinli): Implement IncrementalFileCleanup strategy for linear ancestry
// optimization. Java uses this when: !specifiedSnapshotId && simple linear main branch
// ancestry (no non-main snapshots removed, no non-main snapshots remain).
// The incremental strategy is more efficient because it only needs to scan
// manifests written by expired snapshots (checking added_snapshot_id), avoiding
// the full reachability analysis. It also handles cherry-pick protection via
// SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP.

} // namespace iceberg
Loading
Loading