Skip to content

Commit

Permalink
Respond to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
allisonport-db committed Sep 11, 2024
1 parent d5ec3a3 commit 0083d82
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class TableFeatures {
////////////////////

public static void validateReadSupportedTable(
Protocol protocol, Optional<Metadata> metadata, String tablePath) {
Protocol protocol, String tablePath, Optional<Metadata> metadata) {
switch (protocol.getMinReaderVersion()) {
case 1:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,9 @@ public Clock getClock() {
return clock;
}

protected Path getDataPath() {
return new Path(tablePath);
}

protected Path getLogPath() {
return new Path(tablePath, "_delta_log");
}

/**
* Returns the raw delta actions for each version between startVersion and endVersion. Only reads
* the actions requested in actionSet from the JSON log files.
* Returns delta actions for each version between startVersion and endVersion. Only returns the
* actions requested in actionSet.
*
* <p>For the returned columnar batches:
*
Expand All @@ -149,24 +141,47 @@ protected Path getLogPath() {
* @throws KernelException if a commit file does not exist for any of the versions in the provided
* range
* @throws KernelException if provided an invalid version range
* @throws KernelException if the version range contains a version with reader protocol that is
* unsupported by Kernel
*/
private CloseableIterator<ColumnarBatch> getRawChanges(
public CloseableIterator<ColumnarBatch> getChanges(
Engine engine,
long startVersion,
long endVersion,
Set<DeltaLogActionUtils.DeltaAction> actionSet) {
// Create a new action set that always contains protocol
Set<DeltaLogActionUtils.DeltaAction> copySet = new HashSet<>(actionSet);
copySet.add(DeltaLogActionUtils.DeltaAction.PROTOCOL);
// If protocol is not in the original requested actions we drop the column before returning
boolean shouldDropProtocolColumn =
!actionSet.contains(DeltaLogActionUtils.DeltaAction.PROTOCOL);

List<FileStatus> commitFiles =
DeltaLogActionUtils.getCommitFilesForVersionRange(
engine, new Path(tablePath), startVersion, endVersion);
return getRawChanges(engine, startVersion, endVersion, copySet)
.map(
batch -> {
int protocolIdx = batch.getSchema().indexOf("protocol"); // must exist
ColumnVector protocolVector = batch.getColumnVector(protocolIdx);
for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) {
if (!protocolVector.isNullAt(rowId)) {
Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
TableFeatures.validateReadSupportedTable(
protocol, getDataPath().toString(), Optional.empty());
}
}
if (shouldDropProtocolColumn) {
return batch.withDeletedColumnAt(protocolIdx);
} else {
return batch;
}
});
}

StructType readSchema =
new StructType(
actionSet.stream()
.map(action -> new StructField(action.colName, action.schema, true))
.collect(Collectors.toList()));
protected Path getDataPath() {
return new Path(tablePath);
}

return DeltaLogActionUtils.readCommitFiles(engine, commitFiles, readSchema);
protected Path getLogPath() {
return new Path(tablePath, "_delta_log");
}

/**
Expand Down Expand Up @@ -194,38 +209,23 @@ private CloseableIterator<ColumnarBatch> getRawChanges(
* @throws KernelException if a commit file does not exist for any of the versions in the provided
* range
* @throws KernelException if provided an invalid version range
* @throws KernelException if the version range contains a version with reader protocol that is
* unsupported by Kernel
*/
public CloseableIterator<ColumnarBatch> getChanges(
private CloseableIterator<ColumnarBatch> getRawChanges(
Engine engine,
long startVersion,
long endVersion,
Set<DeltaLogActionUtils.DeltaAction> actionSet) {
// Create a new action set that always contains protocol
Set<DeltaLogActionUtils.DeltaAction> copySet = new HashSet<>(actionSet);
copySet.add(DeltaLogActionUtils.DeltaAction.PROTOCOL);
// If protocol is not in the original requested actions we drop the column before returning
boolean shouldDropProtocolColumn =
!actionSet.contains(DeltaLogActionUtils.DeltaAction.PROTOCOL);

return getRawChanges(engine, startVersion, endVersion, copySet)
.map(
batch -> {
int protocolIdx = batch.getSchema().indexOf("protocol"); // must exist
ColumnVector protocolVector = batch.getColumnVector(protocolIdx);
for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) {
if (!protocolVector.isNullAt(rowId)) {
Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
TableFeatures.validateReadSupportedTable(
protocol, Optional.empty(), getDataPath().toString());
}
}
if (shouldDropProtocolColumn) {
return batch.withDeletedColumnAt(protocolIdx);
} else {
return batch;
}
});
List<FileStatus> commitFiles =
DeltaLogActionUtils.getCommitFilesForVersionRange(
engine, new Path(tablePath), startVersion, endVersion);

StructType readSchema =
new StructType(
actionSet.stream()
.map(action -> new StructField(action.colName, action.schema, true))
.collect(Collectors.toList()));

return DeltaLogActionUtils.readCommitFiles(engine, commitFiles, readSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
if (protocol != null) {
// Stop since we have found the latest Protocol and Metadata.
TableFeatures.validateReadSupportedTable(
protocol, Optional.of(metadata), dataPath.toString());
protocol, dataPath.toString(), Optional.of(metadata));
return new Tuple2<>(protocol, metadata);
}

Expand Down

0 comments on commit 0083d82

Please sign in to comment.