Skip to content

Commit

Permalink
[Kernel] [Refactor] Remove superfluous engine param from Snapshot APIs (
Browse files Browse the repository at this point in the history
#4101)

#### Which Delta project/connector is this regarding?

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

Remove superfluous engine param from Snapshot APIs

## How was this patch tested?

Refactor only.

## Does this PR introduce _any_ user-facing changes?

Removes param from public API.
  • Loading branch information
scottsand-db authored Jan 29, 2025
1 parent c04cb06 commit ee98a22
Show file tree
Hide file tree
Showing 25 changed files with 97 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class KernelDeltaLogDelegator(
kernelSnapshotWrapper,
hadoopConf,
logPath,
kernelSnapshot.getVersion(engine), // note: engine isn't used
kernelSnapshot.getVersion(),
this,
standaloneDeltaLog
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ public Metadata getMetadata() {
*/
@Override
public long getVersion() {
// WARNING: getVersion in SnapshotImpl currently doesn't use the engine so we can
// pass null, but if this changes this code could break
return kernelSnapshot.getVersion(null);
return kernelSnapshot.getVersion();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ public int show(int limit, Optional<List<String>> columnsOpt, Optional<Predicate
throws TableNotFoundException {
Table table = Table.forPath(engine, tablePath);
Snapshot snapshot = table.getLatestSnapshot(engine);
StructType readSchema = pruneSchema(snapshot.getSchema(engine), columnsOpt);
StructType readSchema = pruneSchema(snapshot.getSchema(), columnsOpt);

ScanBuilder scanBuilder = snapshot.getScanBuilder(engine)
.withReadSchema(engine, readSchema);
ScanBuilder scanBuilder = snapshot.getScanBuilder().withReadSchema(engine, readSchema);

if (predicate.isPresent()) {
scanBuilder = scanBuilder.withFilter(engine, predicate.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ public int show(int limit, Optional<List<String>> columnsOpt, Optional<Predicate
throws TableNotFoundException, IOException {
Table table = Table.forPath(engine, tablePath);
Snapshot snapshot = table.getLatestSnapshot(engine);
StructType readSchema = pruneSchema(snapshot.getSchema(engine), columnsOpt);
StructType readSchema = pruneSchema(snapshot.getSchema(), columnsOpt);

ScanBuilder scanBuilder = snapshot.getScanBuilder(engine)
.withReadSchema(engine, readSchema);
ScanBuilder scanBuilder = snapshot.getScanBuilder().withReadSchema(engine, readSchema);

if (predicate.isPresent()) {
scanBuilder = scanBuilder.withFilter(engine, predicate.get());
Expand Down
12 changes: 4 additions & 8 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,19 @@ public interface Snapshot {
/**
* Get the version of this snapshot in the table.
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @return version of this snapshot in the Delta table
*/
long getVersion(Engine engine);
long getVersion();

/**
* Get the names of the partition columns in the Delta table at this snapshot.
*
* <p>The partition column names are returned in the order they are defined in the Delta table
* schema. If the table does not define any partition columns, this method returns an empty list.
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @return a list of partition column names, or an empty list if the table is not partitioned.
*/
List<String> getPartitionColumnNames(Engine engine);
List<String> getPartitionColumnNames();

/**
* Get the timestamp (in milliseconds since the Unix epoch) of the latest commit in this snapshot.
Expand All @@ -59,16 +57,14 @@ public interface Snapshot {
/**
* Get the schema of the table at this snapshot.
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @return Schema of the Delta table at this snapshot.
*/
StructType getSchema(Engine engine);
StructType getSchema();

/**
* Create a scan builder to construct a {@link Scan} to read data from this snapshot.
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @return an instance of {@link ScanBuilder}
*/
ScanBuilder getScanBuilder(Engine engine);
ScanBuilder getScanBuilder();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class ScanBuilderImpl implements ScanBuilder {
private final Metadata metadata;
private final StructType snapshotSchema;
private final LogReplay logReplay;
private final Engine engine;

private StructType readSchema;
private Optional<Predicate> predicate;
Expand All @@ -45,14 +44,12 @@ public ScanBuilderImpl(
Protocol protocol,
Metadata metadata,
StructType snapshotSchema,
LogReplay logReplay,
Engine engine) {
LogReplay logReplay) {
this.dataPath = dataPath;
this.protocol = protocol;
this.metadata = metadata;
this.snapshotSchema = snapshotSchema;
this.logReplay = logReplay;
this.engine = engine;
this.readSchema = snapshotSchema;
this.predicate = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,15 @@ public SnapshotImpl(
/////////////////

@Override
public long getVersion(Engine engine) {
public long getVersion() {
return version;
}

@Override
public List<String> getPartitionColumnNames() {
return VectorUtils.toJavaList(getMetadata().getPartitionColumns());
}

/**
* Get the timestamp (in milliseconds since the Unix epoch) of the latest commit in this Snapshot.
* If the table does not yet exist (i.e. this Snapshot is being used to create the new table),
Expand Down Expand Up @@ -107,14 +112,14 @@ public long getTimestamp(Engine engine) {
}

@Override
public StructType getSchema(Engine engine) {
public StructType getSchema() {
return getMetadata().getSchema();
}

@Override
public ScanBuilder getScanBuilder(Engine engine) {
public ScanBuilder getScanBuilder() {
// TODO when we add ScanReport we will pass the SnapshotReport downstream here
return new ScanBuilderImpl(dataPath, protocol, metadata, getSchema(engine), logReplay, engine);
return new ScanBuilderImpl(dataPath, protocol, metadata, getSchema(), logReplay);
}

///////////////////
Expand All @@ -133,10 +138,6 @@ public Protocol getProtocol() {
return protocol;
}

public List<String> getPartitionColumnNames(Engine engine) {
return VectorUtils.toJavaList(getMetadata().getPartitionColumns());
}

public SnapshotReport getSnapshotReport() {
return snapshotReport;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public Transaction build(Engine engine) {
new InitialSnapshot(table.getDataPath(), logReplay, metadata, protocol, snapshotContext);
}

boolean isNewTable = snapshot.getVersion(engine) < 0;
boolean isNewTable = snapshot.getVersion() < 0;
validate(engine, snapshot, isNewTable);

boolean shouldUpdateMetadata = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public List<String> getPartitionColumns(Engine engine) {

@Override
public StructType getSchema(Engine engine) {
return readSnapshot.getSchema(engine);
return readSnapshot.getSchema();
}

public Optional<SetTransaction> getSetTxnOpt() {
Expand Down Expand Up @@ -166,11 +166,11 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
private TransactionCommitResult commitWithRetry(
Engine engine, CloseableIterable<Row> dataActions, TransactionMetrics transactionMetrics) {
try {
long commitAsVersion = readSnapshot.getVersion(engine) + 1;
long commitAsVersion = readSnapshot.getVersion() + 1;
// Generate the commit action with the inCommitTimestamp if ICT is enabled.
CommitInfo attemptCommitInfo = generateCommitAction(engine);
updateMetadataWithICTIfRequired(
engine, attemptCommitInfo.getInCommitTimestamp(), readSnapshot.getVersion(engine));
engine, attemptCommitInfo.getInCommitTimestamp(), readSnapshot.getVersion());

// If row tracking is supported, assign base row IDs and default row commit versions to any
// AddFile actions that do not yet have them. If the row ID high watermark changes, emit a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW
// against the winning transactions
return new TransactionRebaseState(
lastWinningVersion,
getLastCommitTimestamp(
engine, lastWinningVersion, lastWinningTxn, winningCommitInfoOpt.get()),
getLastCommitTimestamp(lastWinningVersion, lastWinningTxn, winningCommitInfoOpt.get()),
updatedDataActions,
updatedDomainMetadatas);
}
Expand Down Expand Up @@ -343,8 +342,7 @@ private void handleTxn(ColumnVector txnVector) {
}

private List<FileStatus> getWinningCommitFiles(Engine engine) {
String firstWinningCommitFile =
deltaFile(snapshot.getLogPath(), snapshot.getVersion(engine) + 1);
String firstWinningCommitFile = deltaFile(snapshot.getLogPath(), snapshot.getVersion() + 1);

try (CloseableIterator<FileStatus> files =
wrapEngineExceptionThrowsIO(
Expand Down Expand Up @@ -374,18 +372,16 @@ private List<FileStatus> getWinningCommitFiles(Engine engine) {
* latest winning transaction commit file. For non-ICT enabled tables, this is the modification
* time of the latest winning transaction commit file.
*
* @param engine {@link Engine} instance to use
* @param lastWinningVersion last winning version of the table
* @param lastWinningTxn the last winning transaction commit file
* @param winningCommitInfoOpt winning commit info
* @return last commit timestamp of the table
*/
private long getLastCommitTimestamp(
Engine engine,
long lastWinningVersion,
FileStatus lastWinningTxn,
Optional<CommitInfo> winningCommitInfoOpt) {
if (snapshot.getVersion(engine) == -1
if (snapshot.getVersion() == -1
|| !IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(snapshot.getMetadata())) {
return lastWinningTxn.getModificationTime();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ public void checkpoint(Engine engine, Clock clock, long version)

// Check if writing to the given table protocol version/features is supported in Kernel
validateWriteSupportedTable(
snapshot.getProtocol(),
snapshot.getMetadata(),
snapshot.getSchema(engine),
tablePath.toString());
snapshot.getProtocol(), snapshot.getMetadata(), snapshot.getSchema(), tablePath.toString());

Path checkpointPath = FileNames.checkpointFileSingular(logPath, version);

Expand Down Expand Up @@ -288,8 +285,7 @@ private SnapshotImpl createSnapshot(
startingFromStr);

final SnapshotHint hint =
new SnapshotHint(
snapshot.getVersion(engine), snapshot.getProtocol(), snapshot.getMetadata());
new SnapshotHint(snapshot.getVersion(), snapshot.getProtocol(), snapshot.getMetadata());

registerHint(hint);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private static boolean didCurrentTransactionEnableICT(
boolean isICTCurrentlyEnabled =
IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(currentTransactionMetadata);
boolean wasICTEnabledInReadSnapshot =
readSnapshot.getVersion(engine) != -1
readSnapshot.getVersion() != -1
&& IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(readSnapshot.getMetadata());
return isICTCurrentlyEnabled && !wasICTEnabledInReadSnapshot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@ public static boolean partitionExists(
requireNonNull(snapshot, "snapshot is null");
requireNonNull(partitionPredicate, "partitionPredicate is null");

final Set<String> snapshotPartColNames =
new HashSet<>(snapshot.getPartitionColumnNames(engine));
final Set<String> snapshotPartColNames = new HashSet<>(snapshot.getPartitionColumnNames());

io.delta.kernel.internal.util.PartitionUtils.validatePredicateOnlyOnPartitionColumns(
partitionPredicate, snapshotPartColNames);

final Scan scan =
snapshot.getScanBuilder(engine).withFilter(engine, partitionPredicate).build();
final Scan scan = snapshot.getScanBuilder().withFilter(engine, partitionPredicate).build();

try (CloseableIterator<FilteredColumnarBatch> columnarBatchIter = scan.getScanFiles(engine)) {
while (columnarBatchIter.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void benchmark(BenchmarkData benchmarkData, Blackhole blackhole) throws E
Table table = Table.forPath(engine, testTablePath);

Snapshot snapshot = table.getLatestSnapshot(engine);
ScanBuilder scanBuilder = snapshot.getScanBuilder(engine);
ScanBuilder scanBuilder = snapshot.getScanBuilder();
Scan scan = scanBuilder.build();

// Scan state is not used, but get it so that we simulate the real use case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class ActiveAddFilesLogReplayMetricsSuite extends AnyFunSuite with TestUtils {

val scanFileIter = Table.forPath(engine, tablePath)
.getLatestSnapshot(engine)
.getScanBuilder(engine)
.getScanBuilder()
.build()
.getScanFiles(engine)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class CheckpointV2ReadSuite extends AnyFunSuite with TestUtils with ExpressionTe
snapshotFromSpark.protocol.readerFeatureNames)
assert(snapshotImpl.getProtocol.getWriterFeatures.asScala.toSet ==
snapshotFromSpark.protocol.writerFeatureNames)
assert(snapshot.getVersion(defaultEngine) == snapshotFromSpark.version)
assert(snapshot.getVersion() == snapshotFromSpark.version)

// Validate that snapshot read from most recent checkpoint. For most cases, given a checkpoint
// interval of 2, this will be the most recent even version.
Expand All @@ -97,7 +97,7 @@ class CheckpointV2ReadSuite extends AnyFunSuite with TestUtils with ExpressionTe


// Validate AddFiles from sidecars found against Spark connector.
val scan = snapshot.getScanBuilder(defaultEngine).build()
val scan = snapshot.getScanBuilder().build()
val foundFiles =
collectScanFileRows(scan).map(InternalScanFileUtils.getAddFileStatus).map(
_.getPath.split('/').last).toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {

// for now we don't support timestamp type partition columns so remove from read columns
val readCols = Table.forPath(defaultEngine, path).getLatestSnapshot(defaultEngine)
.getSchema(defaultEngine)
.getSchema()
.withoutField("as_timestamp")
.fields()
.asScala
Expand Down Expand Up @@ -680,7 +680,7 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {
test("table protocol version greater than reader protocol version") {
val e = intercept[Exception] {
latestSnapshot(goldenTablePath("deltalog-invalid-protocol-version"))
.getScanBuilder(defaultEngine)
.getScanBuilder()
.build()
}
assert(e.getMessage.contains("Unsupported Delta protocol reader version"))
Expand Down
Loading

0 comments on commit ee98a22

Please sign in to comment.