Skip to content

Commit

Permalink
support rewrite_data_files on branches
Browse files Browse the repository at this point in the history
  • Loading branch information
amitgilad3 committed Feb 13, 2025
1 parent 8839c9b commit 05bf759
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 5 deletions.
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,16 @@ default RewriteDataFiles zOrder(String... columns) {
*/
RewriteDataFiles filter(Expression expression);

/**
* Specify the branch on which the rewrite will be performed.
*
* @param branch the branch where the rewrite happens
* @return this for chaining
*/
default RewriteDataFiles targetBranch(String branch) {
throw new UnsupportedOperationException("targetBranch not implemented");
}

/**
* A map of file group information to the results of rewriting that file group. If the results are
* null then that particular file group failed. We should only have failed groups if partial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
Expand All @@ -39,6 +40,7 @@ public class RewriteDataFilesCommitManager {
private final long startingSnapshotId;
private final boolean useStartingSequenceNumber;
private final Map<String, String> snapshotProperties;
private final String targetBranch;

// constructor used for testing
public RewriteDataFilesCommitManager(Table table) {
Expand All @@ -51,7 +53,12 @@ public RewriteDataFilesCommitManager(Table table, long startingSnapshotId) {

public RewriteDataFilesCommitManager(
Table table, long startingSnapshotId, boolean useStartingSequenceNumber) {
this(table, startingSnapshotId, useStartingSequenceNumber, ImmutableMap.of());
this(
table,
startingSnapshotId,
useStartingSequenceNumber,
ImmutableMap.of(),
SnapshotRef.MAIN_BRANCH);
}

public RewriteDataFilesCommitManager(
Expand All @@ -63,6 +70,20 @@ public RewriteDataFilesCommitManager(
this.startingSnapshotId = startingSnapshotId;
this.useStartingSequenceNumber = useStartingSequenceNumber;
this.snapshotProperties = snapshotProperties;
this.targetBranch = SnapshotRef.MAIN_BRANCH;
}

public RewriteDataFilesCommitManager(
Table table,
long startingSnapshotId,
boolean useStartingSequenceNumber,
Map<String, String> snapshotProperties,
String branch) {
this.table = table;
this.startingSnapshotId = startingSnapshotId;
this.useStartingSequenceNumber = useStartingSequenceNumber;
this.snapshotProperties = snapshotProperties;
this.targetBranch = branch;
}

/**
Expand All @@ -88,7 +109,7 @@ public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
}

snapshotProperties.forEach(rewrite::set);

rewrite.toBranch(targetBranch);
rewrite.commit();
}

Expand Down
12 changes: 12 additions & 0 deletions docs/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,18 @@ Rewrite the data files in table `db.sample` and select the files that may contai
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"');
```

Rewrite the data files in table `db.sample` specifying target branch using WAP
```sql
ALTER TABLE db.table SET TBLPROPERTIES ('write.wap.enabled'='true')
SET spark.wap.branch = audit
CALL catalog_name.system.rewrite_data_files('db.sample');
```

Rewrite the data files in table `db.sample` specifying target branch using branch identifier
```sql
CALL catalog_name.system.rewrite_data_files('db.sample.branch_audit');
```

### `rewrite_manifests`

Rewrite manifests for a table to optimize scan planning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
Expand All @@ -38,6 +39,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
Expand Down Expand Up @@ -991,4 +993,64 @@ private List<Object[]> currentData() {
private List<Object[]> currentData(String table) {
return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2, c3").collectAsList());
}

@TestTemplate
public void testRewriteOnBranchWap() throws Exception {
createPartitionTable();
// create 5 files for each partition (c2 = 'foo' and c2 = 'bar')
insertData(10);
Table table = Spark3Util.loadIcebergTable(spark, tableName);
String branch = "op_audit";
table.manageSnapshots().createBranch(branch).commit();
table.refresh();
long branchSnapshotId = table.currentSnapshot().snapshotId();
insertData(10);
table.refresh();
spark.sql(
String.format("ALTER TABLE %s SET TBLPROPERTIES ('write.wap.enabled'='true')", tableName));
spark.sql(String.format("SET spark.wap.branch = %s", branch));
long lastSnapshotId = table.currentSnapshot().snapshotId();
List<Object[]> output =
sql("CALL %s.system.rewrite_data_files(table => '%s')", catalogName, tableIdent);
assertThat(Arrays.copyOf(output.get(0), 2))
.as("Action should rewrite 10 data files and add 2 data files (one per partition)")
.containsExactly(row(10, 2));
table.refresh();
assertThat(table.refs().get(branch).snapshotId())
.as("branch ref should have changed")
.isNotEqualTo(branchSnapshotId);
assertThat(table.currentSnapshot().snapshotId())
.as("main branch ref should not have changed")
.isEqualTo(lastSnapshotId);
}

@TestTemplate
public void testRewriteOnBranch() throws Exception {
createPartitionTable();
// create 5 files for each partition (c2 = 'foo' and c2 = 'bar')
insertData(10);
Table table = Spark3Util.loadIcebergTable(spark, tableName);
String branch = "op_audit";
table.manageSnapshots().createBranch(branch).commit();
table.refresh();
long branchSnapshotId = table.currentSnapshot().snapshotId();
insertData(10);
table.refresh();
long lastSnapshotId = table.currentSnapshot().snapshotId();
TableIdentifier branchIdent =
TableIdentifier.of(
tableIdent.namespace(), String.format("%s.branch_%s", tableIdent.name(), branch));
List<Object[]> output =
sql("CALL %s.system.rewrite_data_files(table => '%s')", catalogName, branchIdent);
assertThat(Arrays.copyOf(output.get(0), 2))
.as("Action should rewrite 10 data files and add 2 data files (one per partition)")
.containsExactly(row(10, 2));
table.refresh();
assertThat(table.refs().get(branch).snapshotId())
.as("branch ref should have changed")
.isNotEqualTo(branchSnapshotId);
assertThat(table.currentSnapshot().snapshotId())
.as("main branch ref should not have changed")
.isEqualTo(lastSnapshotId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -101,6 +103,7 @@ public class Spark3Util {
private static final Set<String> RESERVED_PROPERTIES =
ImmutableSet.of(TableCatalog.PROP_LOCATION, TableCatalog.PROP_PROVIDER);
private static final Joiner DOT = Joiner.on(".");
private static final Pattern BRANCH = Pattern.compile("branch_(.*)");

private Spark3Util() {}

Expand Down Expand Up @@ -228,6 +231,15 @@ private static void apply(UpdateSchema pendingUpdate, TableChange.UpdateColumnPo
}
}

public static String extractBranch(Identifier ident) {
Matcher branch = BRANCH.matcher(ident.name());
if (branch.matches()) {
return branch.group(1);
} else {
return null;
}
}

private static void apply(UpdateSchema pendingUpdate, TableChange.AddColumn add) {
Preconditions.checkArgument(
add.isNullable(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -103,6 +104,7 @@ public class RewriteDataFilesSparkAction
private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private FileRewriter<FileScanTask, DataFile> rewriter = null;
private String targetBranch = SnapshotRef.MAIN_BRANCH;
private boolean caseSensitive;

RewriteDataFilesSparkAction(SparkSession spark, Table table) {
Expand Down Expand Up @@ -150,6 +152,26 @@ public RewriteDataFilesSparkAction zOrder(String... columnNames) {
return this;
}

@Override
public RewriteDataFiles targetBranch(String branch) {
this.targetBranch = branch;
SnapshotRef ref = table.refs().get(this.targetBranch);
Preconditions.checkArgument(
ref != null, String.format("Branch does not exist: %s", targetBranch));
Preconditions.checkArgument(
ref.isBranch(), String.format("Ref %s is not a branch", targetBranch));
return this;
}

protected long startingSnapshotId() {
if (SnapshotRef.MAIN_BRANCH.equals(this.targetBranch)) {
return table.currentSnapshot().snapshotId();
} else {
SnapshotRef ref = table.refs().get(this.targetBranch);
return ref.snapshotId();
}
}

@Override
public RewriteDataFilesSparkAction filter(Expression expression) {
filter = Expressions.and(filter, expression);
Expand All @@ -162,7 +184,7 @@ public RewriteDataFiles.Result execute() {
return EMPTY_RESULT;
}

long startingSnapshotId = table.currentSnapshot().snapshotId();
long startingSnapshotId = startingSnapshotId();

// Default to BinPack if no strategy selected
if (this.rewriter == null) {
Expand Down Expand Up @@ -276,7 +298,7 @@ private ExecutorService rewriteService() {
@VisibleForTesting
RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
return new RewriteDataFilesCommitManager(
table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
table, startingSnapshotId, useStartingSequenceNumber, commitSummary(), targetBranch);
}

private Builder doExecute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
Expand Down Expand Up @@ -116,7 +119,10 @@ public InternalRow[] call(InternalRow args) {
if (strategy != null || sortOrderString != null) {
action = checkAndApplyStrategy(action, strategy, sortOrderString, table.schema());
}

String branchIdent = Spark3Util.extractBranch(tableIdent);
SparkWriteConf writeConf =
new SparkWriteConf(spark(), table, branchIdent, Maps.newHashMap());
action = checkAndApplyBranch(action, writeConf);
action = checkAndApplyFilter(action, where, tableIdent);

RewriteDataFiles.Result result = action.execute();
Expand All @@ -125,6 +131,15 @@ public InternalRow[] call(InternalRow args) {
});
}

private RewriteDataFiles checkAndApplyBranch(RewriteDataFiles action, SparkWriteConf writeConf) {

String targetBranch = writeConf.branch();
if (targetBranch != null) {
action.targetBranch(targetBranch);
}
return action;
}

private RewriteDataFiles checkAndApplyFilter(
RewriteDataFiles action, String where, Identifier ident) {
if (where != null) {
Expand Down

0 comments on commit 05bf759

Please sign in to comment.