Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Adds protocol checks to the public getChanges API on TableImpl #3651

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

/** Contains methods to create user-facing Delta exceptions. */
Expand Down Expand Up @@ -149,12 +150,13 @@ public static KernelException unsupportedReaderProtocol(
return new KernelException(message);
}

public static KernelException unsupportedReaderFeature(String tablePath, String readerFeature) {
public static KernelException unsupportedReaderFeature(
String tablePath, Set<String> unsupportedFeatures) {
String message =
String.format(
"Unsupported Delta reader feature: table `%s` requires reader table feature \"%s\" "
"Unsupported Delta reader features: table `%s` requires reader table features [%s] "
+ "which is unsupported by this version of Delta Kernel.",
tablePath, readerFeature);
tablePath, String.join(", ", unsupportedFeatures));
return new KernelException(message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private DeltaLogActionUtils() {}

/**
* Represents a Delta action. This is used to request which actions to read from the commit files
* in {@link TableImpl#getChangesByVersion(Engine, long, long, Set)}.
* in {@link TableImpl#getChanges(Engine, long, long, Set)}.
*
* <p>See the Delta protocol for more details
* https://github.com/delta-io/delta/blob/master/PROTOCOL.md#actions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.StructType;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;

/** Contains utility methods related to the Delta table feature support in protocol. */
Expand All @@ -43,34 +40,40 @@ public class TableFeatures {
}
});

private static final Set<String> SUPPORTED_READER_FEATURES =
Collections.unmodifiableSet(
new HashSet<String>() {
{
add("columnMapping");
add("deletionVectors");
add("timestampNtz");
add("vacuumProtocolCheck");
add("variantType-preview");
add("v2Checkpoint");
}
});

////////////////////
// Helper Methods //
////////////////////

public static void validateReadSupportedTable(
Protocol protocol, Metadata metadata, String tablePath) {
Protocol protocol, String tablePath, Optional<Metadata> metadata) {
switch (protocol.getMinReaderVersion()) {
case 1:
break;
case 2:
ColumnMapping.throwOnUnsupportedColumnMappingMode(metadata);
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
break;
case 3:
List<String> readerFeatures = protocol.getReaderFeatures();
for (String readerFeature : readerFeatures) {
switch (readerFeature) {
case "columnMapping":
ColumnMapping.throwOnUnsupportedColumnMappingMode(metadata);
break;
case "deletionVectors": // fall through
case "timestampNtz": // fall through
case "vacuumProtocolCheck": // fall through
case "variantType-preview": // fall through
case "v2Checkpoint":
break;
default:
throw DeltaErrors.unsupportedReaderFeature(tablePath, readerFeature);
}
if (!SUPPORTED_READER_FEATURES.containsAll(readerFeatures)) {
Set<String> unsupportedFeatures = new HashSet<>(readerFeatures);
unsupportedFeatures.removeAll(SUPPORTED_READER_FEATURES);
throw DeltaErrors.unsupportedReaderFeature(tablePath, unsupportedFeatures);
}
if (readerFeatures.contains("columnMapping")) {
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;

import io.delta.kernel.*;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.snapshot.SnapshotManager;
import io.delta.kernel.internal.util.Clock;
Expand All @@ -32,7 +34,9 @@
import io.delta.kernel.utils.FileStatus;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -112,6 +116,66 @@ public Clock getClock() {
return clock;
}

/**
* Returns delta actions for each version between startVersion and endVersion. Only returns the
* actions requested in actionSet.
*
* <p>For the returned columnar batches:
*
* <ul>
* <li>Each row within the same batch is guaranteed to have the same commit version
* <li>The batch commit versions are monotonically increasing
* <li>The top-level columns include "version", "timestamp", and the actions requested in
* actionSet. "version" and "timestamp" are the first and second columns in the schema,
* respectively. The remaining columns are based on the actions requested and each have the
* schema found in {@code DeltaAction.schema}.
* </ul>
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @param startVersion start version (inclusive)
* @param endVersion end version (inclusive)
* @param actionSet the actions to read and return from the JSON log files
* @return an iterator of batches where each row in the batch has exactly one non-null action and
* its commit version and timestamp
* @throws TableNotFoundException if the table does not exist or if it is not a delta table
* @throws KernelException if a commit file does not exist for any of the versions in the provided
* range
* @throws KernelException if provided an invalid version range
* @throws KernelException if the version range contains a version with reader protocol that is
* unsupported by Kernel
*/
public CloseableIterator<ColumnarBatch> getChanges(
Engine engine,
long startVersion,
long endVersion,
Set<DeltaLogActionUtils.DeltaAction> actionSet) {
// Create a new action set that always contains protocol
Set<DeltaLogActionUtils.DeltaAction> copySet = new HashSet<>(actionSet);
copySet.add(DeltaLogActionUtils.DeltaAction.PROTOCOL);
// If protocol is not in the original requested actions we drop the column before returning
boolean shouldDropProtocolColumn =
!actionSet.contains(DeltaLogActionUtils.DeltaAction.PROTOCOL);

return getRawChanges(engine, startVersion, endVersion, copySet)
.map(
batch -> {
int protocolIdx = batch.getSchema().indexOf("protocol"); // must exist
ColumnVector protocolVector = batch.getColumnVector(protocolIdx);
for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) {
if (!protocolVector.isNullAt(rowId)) {
Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
TableFeatures.validateReadSupportedTable(
protocol, getDataPath().toString(), Optional.empty());
}
}
if (shouldDropProtocolColumn) {
return batch.withDeletedColumnAt(protocolIdx);
} else {
return batch;
}
});
}

protected Path getDataPath() {
return new Path(tablePath);
}
Expand Down Expand Up @@ -146,7 +210,7 @@ protected Path getLogPath() {
* range
* @throws KernelException if provided an invalid version range
*/
public CloseableIterator<ColumnarBatch> getChangesByVersion(
private CloseableIterator<ColumnarBatch> getRawChanges(
Engine engine,
long startVersion,
long endVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(

if (protocol != null) {
// Stop since we have found the latest Protocol and Metadata.
TableFeatures.validateReadSupportedTable(protocol, metadata, dataPath.toString());
TableFeatures.validateReadSupportedTable(
protocol, dataPath.toString(), Optional.of(metadata));
return new Tuple2<>(protocol, metadata);
}

Expand Down
Loading
Loading