Skip to content

Commit

Permalink
[Kernel] Support beforeOrAt and atOrAfter semantics for getting a com…
Browse files Browse the repository at this point in the history
…mit version from a timestamp (#3650)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

Adds the functions `getVersionBeforeOrAtTimestamp` and
`getVersionAtOrAfterTimestamp` to `TableImpl` for use with the
`getChangesByVersion` API to enable querying changes between two
timestamps.

## How was this patch tested?

Adds unit tests.
  • Loading branch information
allisonport-db authored Sep 9, 2024
1 parent 92f2068 commit 66440f0
Show file tree
Hide file tree
Showing 7 changed files with 574 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.delta.kernel.internal.fs.Path.getName;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.checkpoints.CheckpointInstance;
import io.delta.kernel.internal.fs.Path;
Expand All @@ -40,44 +41,66 @@ private DeltaHistoryManager() {}
private static final Logger logger = LoggerFactory.getLogger(DeltaHistoryManager.class);

/**
* Returns the latest recreatable commit that happened at or before {@code timestamp}. If the
* provided timestamp is after the timestamp of the latest version of the table throws an
* exception. If the provided timestamp is before the timestamp of the earliest version of the
* table throws an exception.
* Returns the latest commit that happened at or before {@code timestamp}.
*
* <p>If the timestamp is outside the range of [earliestCommit, latestCommit] then use parameters
* {@code canReturnLastCommit} and {@code canReturnEarliestCommit} to control whether an exception
* is thrown or the corresponding earliest/latest commit is returned.
*
* @param engine instance of {@link Engine} to use
* @param logPath the _delta_log path of the table
* @param timestamp the timestamp find the version for in milliseconds since the unix epoch
* @return the active recreatable commit version at the provided timestamp
* @param mustBeRecreatable whether the state at the returned commit should be recreatable
* @param canReturnLastCommit whether we can return the latest version of the table if the
* provided timestamp is after the latest commit
* @param canReturnEarliestCommit whether we can return the earliest version of the table if the
* provided timestamp is before the earliest commit
* @throws KernelException if the provided timestamp is before the earliest commit and
* canReturnEarliestCommit is false
* @throws KernelException if the provided timestamp is after the latest commit and
* canReturnLastCommit is false
* @throws TableNotFoundException when there is no Delta table at the given path
*/
public static long getActiveCommitAtTimestamp(Engine engine, Path logPath, long timestamp)
public static Commit getActiveCommitAtTimestamp(
Engine engine,
Path logPath,
long timestamp,
boolean mustBeRecreatable,
boolean canReturnLastCommit,
boolean canReturnEarliestCommit)
throws TableNotFoundException {

long earliestRecreatableCommit = getEarliestRecreatableCommit(engine, logPath);
long earliestVersion =
(mustBeRecreatable)
? getEarliestRecreatableCommit(engine, logPath)
: getEarliestDeltaFile(engine, logPath);

// Search for the commit
List<Commit> commits = getCommits(engine, logPath, earliestRecreatableCommit);
List<Commit> commits = getCommits(engine, logPath, earliestVersion);
Commit commit =
lastCommitBeforeOrAtTimestamp(commits, timestamp)
.orElseThrow(
() ->
DeltaErrors.timestampBeforeFirstAvailableCommit(
logPath.getParent().toString(), /* use dataPath */
timestamp,
commits.get(0).timestamp,
commits.get(0).version));
.orElse(commits.get(0)); // This is only returned if canReturnEarliestCommit (see below)

// If timestamp is before the earliest commit
if (commit.timestamp > timestamp && !canReturnEarliestCommit) {
throw DeltaErrors.timestampBeforeFirstAvailableCommit(
logPath.getParent().toString(), /* use dataPath */
timestamp,
commits.get(0).timestamp,
commits.get(0).version);
}
// If timestamp is after the last commit of the table
if (commit == commits.get(commits.size() - 1) && commit.timestamp < timestamp) {
if (commit == commits.get(commits.size() - 1)
&& commit.timestamp < timestamp
&& !canReturnLastCommit) {
throw DeltaErrors.timestampAfterLatestCommit(
logPath.getParent().toString(), /* use dataPath */
timestamp,
commit.timestamp,
commit.version);
}

return commit.version;
return commit;
}

/**
Expand Down Expand Up @@ -161,6 +184,30 @@ public static long getEarliestRecreatableCommit(Engine engine, Path logPath)
}
}

/**
* Get the earliest commit available for this table. Note that this version isn't guaranteed to
* exist when performing an action as a concurrent operation can delete the file during cleanup.
* This value must be used as a lower bound.
*/
public static long getEarliestDeltaFile(Engine engine, Path logPath)
throws TableNotFoundException {

try (CloseableIterator<FileStatus> files =
listFrom(engine, logPath, 0).filter(fs -> FileNames.isCommitFile(getName(fs.getPath())))) {

if (files.hasNext()) {
return FileNames.deltaVersion(files.next().getPath());
} else {
// listFrom already throws an error if the directory is truly empty, thus this must
// be because no files are delta files
throw new RuntimeException(
String.format("No delta files found in the directory: %s", logPath));
}
} catch (IOException e) {
throw new RuntimeException("Could not close iterator", e);
}
}

/**
* Returns an iterator containing a list of files found in the _delta_log directory starting with
* {@code startVersion}. Throws a {@link TableNotFoundException} if the directory doesn't exist or
Expand Down Expand Up @@ -242,16 +289,24 @@ private static Optional<Commit> lastCommitBeforeOrAtTimestamp(
return Optional.ofNullable((i < 0) ? null : commits.get(i));
}

private static class Commit {
public static class Commit {

final long version;
final long timestamp;
private final long version;
private final long timestamp;

Commit(long version, long timestamp) {
this.version = version;
this.timestamp = timestamp;
}

public long getVersion() {
return version;
}

public long getTimestamp() {
return timestamp;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,86 @@ protected Path getLogPath() {
return new Path(tablePath, "_delta_log");
}

/**
* Returns the latest version that was committed before or at {@code millisSinceEpochUTC}. If no
* version exists, throws a {@link KernelException}
*
* <p>Specifically:
*
* <ul>
* <li>if a commit version exactly matches the provided timestamp, we return it
* <li>else, we return the latest commit version with a timestamp less than the provided one
* <li>If the provided timestamp is less than the timestamp of any committed version, we throw
* an error.
* </ul>
*
* .
*
* @param millisSinceEpochUTC the number of milliseconds since midnight, January 1, 1970 UTC
* @return latest commit that happened before or at {@code timestamp}.
* @throws KernelException if the timestamp is less than the timestamp of any committed version
* @throws TableNotFoundException if no delta table is found
*/
public long getVersionBeforeOrAtTimestamp(Engine engine, long millisSinceEpochUTC) {
return DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
getLogPath(),
millisSinceEpochUTC,
false, /* mustBeRecreatable */
// e.g. if we give time T+2 and last commit has time T, then we DO want that last commit
true, /* canReturnLastCommit */
// e.g. we give time T-1 and first commit has time T, then do NOT want that earliest
// commit
false /* canReturnEarliestCommit */)
.getVersion();
}

/**
* Returns the latest version that was committed at or after {@code millisSinceEpochUTC}. If no
* version exists, throws a {@link KernelException}
*
* <p>Specifically:
*
* <ul>
* <li>if a commit version exactly matches the provided timestamp, we return it
* <li>else, we return the earliest commit version with a timestamp greater than the provided
* one
* <li>If the provided timestamp is larger than the timestamp of any committed version, we throw
* an error.
* </ul>
*
* .
*
* @param millisSinceEpochUTC the number of milliseconds since midnight, January 1, 1970 UTC
* @return latest commit that happened at or before {@code timestamp}.
* @throws KernelException if the timestamp is more than the timestamp of any committed version
* @throws TableNotFoundException if no delta table is found
*/
public long getVersionAtOrAfterTimestamp(Engine engine, long millisSinceEpochUTC) {
DeltaHistoryManager.Commit commit =
DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
getLogPath(),
millisSinceEpochUTC,
false, /* mustBeRecreatable */
// e.g. if we give time T+2 and last commit has time T, then we do NOT want that last
// commit
false, /* canReturnLastCommit */
// e.g. we give time T-1 and first commit has time T, then we DO want that earliest
// commit
true /* canReturnEarliestCommit */);

if (commit.getTimestamp() >= millisSinceEpochUTC) {
return commit.getVersion();
} else {
// this commit.timestamp is before the input timestamp. if this is the last commit, then
// the input timestamp is after the last commit and `getActiveCommitAtTimestamp` would have
// thrown an KernelException. So, clearly, this can't be the last commit, so we can safely
// return commit.version + 1 as the version that is at or after the input timestamp.
return commit.getVersion() + 1;
}
}

/**
* Returns the raw delta actions for each version between startVersion and endVersion. Only reads
* the actions requested in actionSet from the JSON log files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,14 @@ public Snapshot getSnapshotForTimestamp(Engine engine, long millisSinceEpochUTC)
throws TableNotFoundException {
long startTimeMillis = System.currentTimeMillis();
long versionToRead =
DeltaHistoryManager.getActiveCommitAtTimestamp(engine, logPath, millisSinceEpochUTC);
DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
logPath,
millisSinceEpochUTC,
true /* mustBeRecreatable */,
false /* canReturnLastCommit */,
false /* canReturnEarliestCommit */)
.getVersion();
logger.info(
"{}: Took {}ms to fetch version at timestamp {}",
tablePath,
Expand Down
Loading

0 comments on commit 66440f0

Please sign in to comment.