Skip to content

Commit

Permalink
Core: Prevent incremental file cleanup when expiring specified snapsh…
Browse files Browse the repository at this point in the history
…ots (#10983)
  • Loading branch information
hantangwangd authored Sep 9, 2024
1 parent 153b070 commit 41d00ae
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void accept(String file) {
private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;
private ExecutorService planExecutorService = ThreadPools.getWorkerPool();
private Boolean incrementalCleanup;
private boolean specifiedSnapshotId = false;

RemoveSnapshots(TableOperations ops) {
this.ops = ops;
Expand Down Expand Up @@ -116,6 +117,7 @@ public ExpireSnapshots cleanExpiredFiles(boolean clean) {
public ExpireSnapshots expireSnapshotId(long expireSnapshotId) {
LOG.info("Expiring snapshot with id: {}", expireSnapshotId);
idsToRemove.add(expireSnapshotId);
specifiedSnapshotId = true;
return this;
}

Expand Down Expand Up @@ -321,6 +323,15 @@ ExpireSnapshots withIncrementalCleanup(boolean useIncrementalCleanup) {
private void cleanExpiredSnapshots() {
TableMetadata current = ops.refresh();

if (specifiedSnapshotId) {
if (incrementalCleanup != null && incrementalCleanup) {
throw new UnsupportedOperationException(
"Cannot clean files incrementally when snapshot IDs are specified");
}

incrementalCleanup = false;
}

if (incrementalCleanup == null) {
incrementalCleanup = current.refs().size() == 1;
}
Expand Down
25 changes: 23 additions & 2 deletions core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public void testRetainLastWithExpireById() {
}

// Retain last 3 snapshots, but explicitly remove the first snapshot
removeSnapshots(table).expireSnapshotId(firstSnapshotId).retainLast(3).commit();
table.expireSnapshots().expireSnapshotId(firstSnapshotId).retainLast(3).commit();

assertThat(table.snapshots()).hasSize(2);
assertThat(table.snapshot(firstSnapshotId)).isNull();
Expand Down Expand Up @@ -956,7 +956,8 @@ public void testWithExpiringStagedThenCherrypick() {
List<String> deletedFiles = Lists.newArrayList();

// Expire `B` commit.
removeSnapshots(table)
table
.expireSnapshots()
.deleteWith(deletedFiles::add)
.expireSnapshotId(snapshotB.snapshotId())
.commit();
Expand Down Expand Up @@ -1171,6 +1172,26 @@ public void testBranchExpiration() {
assertThat(table.ops().current().ref(SnapshotRef.MAIN_BRANCH)).isNotNull();
}

@TestTemplate
public void testIncrementalCleanupFailsWhenExpiringSnapshotId() {
table.newAppend().appendFile(FILE_A).commit();
table.newDelete().deleteFile(FILE_A).commit();
long snapshotId = table.currentSnapshot().snapshotId();
table.newAppend().appendFile(FILE_B).commit();
waitUntilAfter(table.currentSnapshot().timestampMillis());
RemoveSnapshots removeSnapshots = (RemoveSnapshots) table.expireSnapshots();

assertThatThrownBy(
() ->
removeSnapshots
.withIncrementalCleanup(true)
.expireSnapshotId(snapshotId)
.cleanExpiredFiles(true)
.commit())
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Cannot clean files incrementally when snapshot IDs are specified");
}

@TestTemplate
public void testMultipleRefsAndCleanExpiredFilesFailsForIncrementalCleanup() {
table.newAppend().appendFile(FILE_A).commit();
Expand Down

0 comments on commit 41d00ae

Please sign in to comment.