Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Load the protocol and metadata from the CRC files when available #4077

Merged
merged 62 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
dd9fe7b
[Kernel] Support for loading protocol and metadata from checksum file…
vkorukanti May 15, 2024
df12241
refactor according to the comments
huan233usc Jan 21, 2025
a404846
add logging and fix comments
huan233usc Jan 22, 2025
1e410c5
move comments
huan233usc Jan 22, 2025
1994b2e
update comments
huan233usc Jan 22, 2025
ddc0815
update comments
huan233usc Jan 22, 2025
02067f3
replace static test table with test generated
huan233usc Jan 22, 2025
dec4938
use optional in building crc and use rfii in crc test
huan233usc Jan 22, 2025
6eff126
resolve comments
huan233usc Jan 22, 2025
69c29df
update comments
huan233usc Jan 22, 2025
fec680e
fix test
huan233usc Jan 22, 2025
60981a6
updated param name
huan233usc Jan 22, 2025
97bc44f
update doc
huan233usc Jan 22, 2025
2f5b1a2
fix idention
huan233usc Jan 22, 2025
c7bf343
update the doc to reflect nullness
huan233usc Jan 22, 2025
dbf4490
fix javafmt
huan233usc Jan 22, 2025
f19d48b
check crc info's version, use snapshot for lower bound
huan233usc Jan 22, 2025
25912ed
update comments
huan233usc Jan 22, 2025
44c8445
clean up tests
huan233usc Jan 23, 2025
4122d70
clean unused test methods
huan233usc Jan 23, 2025
017260c
revert unused test methods
huan233usc Jan 23, 2025
ab1115a
clean up unused import
huan233usc Jan 23, 2025
7d89e55
revert unused test methods
huan233usc Jan 23, 2025
2270126
resolve comments
huan233usc Jan 23, 2025
ab3d62e
handle edge case
huan233usc Jan 23, 2025
78bb03c
prefer use crc over checkpoint
huan233usc Jan 23, 2025
9949e90
fix java format
huan233usc Jan 23, 2025
e1532e7
add tests and fix listing bug
huan233usc Jan 23, 2025
69acace
refactor per comments
huan233usc Jan 23, 2025
9993886
handle version is 0
huan233usc Jan 23, 2025
70829a0
merge from latest version
huan233usc Jan 23, 2025
733eaed
fix java format
huan233usc Jan 23, 2025
4202b31
rever accident deleted changes in conflict resolve
huan233usc Jan 23, 2025
37f4251
refactor
huan233usc Jan 23, 2025
7494fd3
fix test attempt 1
huan233usc Jan 23, 2025
3f0e05f
fix test attempt 2
huan233usc Jan 23, 2025
4bb7bad
add tests
huan233usc Jan 24, 2025
e42b70e
fix indent
huan233usc Jan 24, 2025
18ecc74
fix test
huan233usc Jan 24, 2025
7383397
add docs
huan233usc Jan 24, 2025
afd5db1
fix comment
huan233usc Jan 24, 2025
ec3a24b
fix comment
huan233usc Jan 24, 2025
412e0fb
resolve comments
huan233usc Jan 24, 2025
e3d86a9
format java
huan233usc Jan 24, 2025
0dbea3f
update check
huan233usc Jan 24, 2025
588b464
format java
huan233usc Jan 24, 2025
155cf2b
update internal utils to move filter away
huan233usc Jan 24, 2025
8fe533b
fix comment
huan233usc Jan 28, 2025
119d0e0
take while
huan233usc Jan 28, 2025
c189537
adding header
huan233usc Jan 28, 2025
cf54166
fix typo
huan233usc Jan 28, 2025
21ed78f
fix typo
huan233usc Jan 28, 2025
5bf479e
switch to checkpoint opt
huan233usc Jan 29, 2025
8a96791
remove unused import
huan233usc Jan 29, 2025
9882740
merge with conflict
huan233usc Jan 30, 2025
d05450d
resolving conflict 1 - remove unnecessary files
huan233usc Jan 30, 2025
4e7078d
resolving conflict 2 fix tests
huan233usc Jan 30, 2025
2408be8
resolving conflict 3 resolve accidental deleted suites
huan233usc Jan 30, 2025
aa82629
empty line
huan233usc Jan 30, 2025
5b6c03b
fix year
huan233usc Jan 30, 2025
bdba22a
fix year
huan233usc Jan 30, 2025
a1611b8
use inmemory list
huan233usc Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@ private LogReplay getEmptyLogReplay(

@Override
protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
Engine engine, Optional<SnapshotHint> snapshotHint, long snapshotVersion) {
Engine engine,
LogSegment logSegment,
Optional<SnapshotHint> snapshotHint,
long snapshotVersion) {
return new Tuple2<>(protocol, metadata);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.replay;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.types.StructType;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CRCInfo {
private static final Logger logger = LoggerFactory.getLogger(CRCInfo.class);

public static Optional<CRCInfo> fromColumnarBatch(
Engine engine, long version, ColumnarBatch batch, int rowId, String crcFilePath) {
huan233usc marked this conversation as resolved.
Show resolved Hide resolved
Protocol protocol = Protocol.fromColumnVector(batch.getColumnVector(PROTOCOL_ORDINAL), rowId);
Metadata metadata = Metadata.fromColumnVector(batch.getColumnVector(METADATA_ORDINAL), rowId);
// protocol and metadata are nullable per fromColumnVector's implementation.
if (protocol == null || metadata == null) {
logger.warn("Invalid checksum file missing protocol and/or metadata: {}", crcFilePath);
return Optional.empty();
}
return Optional.of(new CRCInfo(version, metadata, protocol));
}

// We can add additional fields later
public static final StructType FULL_SCHEMA =
new StructType().add("protocol", Protocol.FULL_SCHEMA).add("metadata", Metadata.FULL_SCHEMA);

private static final int PROTOCOL_ORDINAL = 0;
private static final int METADATA_ORDINAL = 1;

private final long version;
private final Metadata metadata;
private final Protocol protocol;

protected CRCInfo(long version, Metadata metadata, Protocol protocol) {
this.version = version;
huan233usc marked this conversation as resolved.
Show resolved Hide resolved
this.metadata = requireNonNull(metadata);
this.protocol = requireNonNull(protocol);
}

/** The version of the Delta table that this CRCInfo represents. */
public long getVersion() {
return version;
}

/** The {@link Metadata} stored in this CRCInfo. */
public Metadata getMetadata() {
return metadata;
}

/** The {@link Protocol} stored in this CRCInfo. */
public Protocol getProtocol() {
return protocol;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.replay;

import static io.delta.kernel.internal.util.FileNames.*;
import static io.delta.kernel.internal.util.InternalUtils.toFilteredList;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
import static java.lang.Math.min;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import java.io.IOException;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utility method to load protocol and metadata from the Delta log checksum files. */
public class ChecksumReader {
private static final Logger logger = LoggerFactory.getLogger(ChecksumReader.class);

/**
* Load the CRCInfo from the checksum file at the given version. If the checksum file is not found
* at the given version, it will try to find the latest checksum file that is created at or after
* the lower bound version.
*
* @param engine the engine to use for reading the checksum file
* @param logPath the path to the Delta log
* @param targetedVersion the target version to read the checksum file from
* @param lowerBound the inclusive lower bound version to search for the checksum file
* @return Optional {@link CRCInfo} containing the protocol and metadata, and the version of the
* checksum file. If the checksum file is not found, it will return an empty
*/
public static Optional<CRCInfo> getCRCInfo(
Engine engine, Path logPath, long targetedVersion, long lowerBound) {
// lower bound should always smaller than the targetedVersion.
lowerBound = min(lowerBound, targetedVersion);
logger.info("Loading CRC file for version {} with lower bound {}", targetedVersion, lowerBound);
// First try to load the CRC at given version. If not found or failed to read then try to
// find the latest CRC file that is created at or after the lower bound version.
Path crcFilePath = checksumFile(logPath, targetedVersion);
Optional<CRCInfo> crcInfoOpt = readChecksumFile(engine, crcFilePath);
if (crcInfoOpt.isPresent()
||
// we don't expect any more checksum files as it is the first version
targetedVersion == 0
|| targetedVersion == lowerBound) {
return crcInfoOpt;
}
logger.info(
"CRC file for version {} not found, listing CRC files from version {}",
targetedVersion,
lowerBound);

Path lowerBoundFilePath = checksumFile(logPath, lowerBound);
try (CloseableIterator<FileStatus> crcFiles =
engine.getFileSystemClient().listFrom(lowerBoundFilePath.toString())) {
List<FileStatus> crcFilesList =
toFilteredList(
crcFiles,
file ->
isChecksumFile(file.getPath())
&& checksumVersion(new Path(file.getPath())) <= targetedVersion);

// pick the last file which is the latest version that has the CRC file
if (crcFilesList.isEmpty()) {
logger.warn("No checksum files found in the range {} to {}", lowerBound, targetedVersion);
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
return Optional.empty();
}

FileStatus latestCRCFile = crcFilesList.get(crcFilesList.size() - 1);
return readChecksumFile(engine, new Path(latestCRCFile.getPath()));
} catch (IOException e) {
logger.warn("Failed to list checksum files from {}", lowerBoundFilePath, e);
return Optional.empty();
}
}

private static Optional<CRCInfo> readChecksumFile(Engine engine, Path filePath) {
try (CloseableIterator<ColumnarBatch> iter =
engine
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(FileStatus.of(filePath.toString())),
CRCInfo.FULL_SCHEMA,
Optional.empty())) {
// We do this instead of iterating through the rows or using `getSingularRow` so we
// can use the existing fromColumnVector methods in Protocol, Metadata, Format etc
if (!iter.hasNext()) {
logger.warn("Checksum file is empty: {}", filePath);
return Optional.empty();
}

ColumnarBatch batch = iter.next();
if (batch.getSize() != 1) {
String msg = "Expected exactly one row in the checksum file {}, found {} rows";
logger.warn(msg, filePath, batch.getSize());
return Optional.empty();
}

long crcVersion = FileNames.checksumVersion(filePath);

return CRCInfo.fromColumnarBatch(
engine, crcVersion, batch, 0 /* rowId */, filePath.toString());
} catch (Exception e) {
// This can happen when the version does not have a checksum file
logger.warn("Failed to read checksum file {}", filePath, e);
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package io.delta.kernel.internal.replay;

import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable;
import static io.delta.kernel.internal.util.FileNames.checkpointVersion;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Arrays.asList;
import static java.util.Collections.max;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
Expand All @@ -38,9 +42,8 @@
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;

/**
* Replays a history of actions, resolving them to produce the current state of the table. The
Expand Down Expand Up @@ -135,7 +138,7 @@ public LogReplay(
this.logSegment = logSegment;
this.protocolAndMetadata =
snapshotMetrics.loadInitialDeltaActionsTimer.time(
() -> loadTableProtocolAndMetadata(engine, snapshotHint, snapshotVersion));
() -> loadTableProtocolAndMetadata(engine, logSegment, snapshotHint, snapshotVersion));
// Lazy loading of domain metadata only when needed
this.domainMetadataMap = new Lazy<>(() -> loadDomainMetadataMap(engine));
}
Expand Down Expand Up @@ -201,13 +204,54 @@ public CloseableIterator<FilteredColumnarBatch> getAddFilesAsColumnarBatches(
* use the P and/or M from the hint.
*/
protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
Engine engine, Optional<SnapshotHint> snapshotHint, long snapshotVersion) {
Engine engine,
LogSegment logSegment,
Optional<SnapshotHint> snapshotHint,
long snapshotVersion) {

// Exit early if the hint already has the info we need
// Exit early if the hint already has the info we need.
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() == snapshotVersion) {
return new Tuple2<>(snapshotHint.get().getProtocol(), snapshotHint.get().getMetadata());
}

// Snapshot hit is not use-able in this case for determine the lower bound.
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() > snapshotVersion) {
snapshotHint = Optional.empty();
}

// Finds the inclusive lower bound for CRC search.
// If the snapshot hint or checkpoint older than required version is present, we can use them as
// the lower bound for the CRC search.
List<Long> eligibleCheckpointVersions =
logSegment.checkpoints.stream()
.map(cp -> checkpointVersion(cp.getPath()))
.filter(v -> v <= snapshotVersion)
.collect(Collectors.toList());
long crcSearchLowerBound =
max(
asList(
snapshotHint.map(SnapshotHint::getVersion).orElse(0L) + 1,
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
eligibleCheckpointVersions.isEmpty() ? 0L : max(eligibleCheckpointVersions),
// Only find the CRC within 100 versions.
snapshotVersion - 100,
0L));
Optional<CRCInfo> crcInfoOpt =
ChecksumReader.getCRCInfo(engine, logSegment.logPath, snapshotVersion, crcSearchLowerBound);
if (crcInfoOpt.isPresent()) {
CRCInfo crcInfo = crcInfoOpt.get();
if (crcInfo.getVersion() == snapshotVersion) {
// CRC is related to the desired snapshot version. Load protocol and metadata from CRC.
return new Tuple2<>(crcInfo.getProtocol(), crcInfo.getMetadata());
}
checkArgument(
crcInfo.getVersion() >= crcSearchLowerBound && crcInfo.getVersion() <= snapshotVersion);
// We found a CRCInfo of a version (a) older than the one we are looking for (snapshotVersion)
// but (b) newer than the current hint. Use this CRCInfo to create a new hint
snapshotHint =
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
Optional.of(
new SnapshotHint(crcInfo.getVersion(), crcInfo.getProtocol(), crcInfo.getMetadata()));
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
}

Protocol protocol = null;
Metadata metadata = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ private FileNames() {}
private static final Pattern CLASSIC_CHECKPOINT_FILE_PATTERN =
Pattern.compile("\\d+\\.checkpoint\\.parquet");

/** Example: 00000000000000000001.crc */
private static final Pattern CHECK_SUM_FILE_PATTERN = Pattern.compile("(\\d+)\\.crc");

/**
* Examples:
*
Expand Down Expand Up @@ -84,8 +87,8 @@ public static long getFileVersion(Path path) {
return checkpointVersion(path);
} else if (isCommitFile(path.getName())) {
return deltaVersion(path);
// } else if (isChecksumFile(path)) {
// checksumVersion(path);
} else if (isChecksumFile(path.getName())) {
return checksumVersion(path);
} else {
throw new IllegalArgumentException(
String.format("Unexpected file type found in transaction log: %s", path));
Expand Down Expand Up @@ -128,6 +131,15 @@ public static String sidecarFile(Path path, String sidecar) {
return String.format("%s/%s/%s", path.toString(), SIDECAR_DIRECTORY, sidecar);
}

/** Returns the path to the checksum file for the given version. */
public static Path checksumFile(Path path, long version) {
return new Path(path, String.format("%020d.crc", version));
}

public static long checksumVersion(Path path) {
return Long.parseLong(path.getName().split("\\.")[0]);
}

/**
* Returns the prefix of all delta log files for the given version.
*
Expand Down Expand Up @@ -206,4 +218,8 @@ public static boolean isCommitFile(String path) {
return DELTA_FILE_PATTERN.matcher(fileName).matches()
|| UUID_DELTA_FILE_REGEX.matcher(fileName).matches();
}

public static boolean isChecksumFile(String checksumFilePath) {
return CHECK_SUM_FILE_PATTERN.matcher(new Path(checksumFilePath).getName()).matches();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

public class InternalUtils {
Expand Down Expand Up @@ -169,4 +168,11 @@ public static Path relativizePath(Path child, URI root) {
public static Set<String> toLowerCaseSet(Collection<String> set) {
return set.stream().map(String::toLowerCase).collect(Collectors.toSet());
}

public static <T> List<T> toFilteredList(
CloseableIterator<T> iterator, Function<T, Boolean> filter) {
List<T> result = new ArrayList<>();
iterator.filter(filter).forEachRemaining(result::add);
return result;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry nit, I think it's easier if this is just toList and we can do the iterator filtering in the outer call. This way we can re-use this even when we don't want to filter

i.e. toList(iterator.filter(filter))

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ public static FileStatus of(String path, long size, long modificationTime) {
return new FileStatus(path, size, modificationTime);
}

/**
* Create a {@link FileStatus} with the given path with size and modification time set to 0.
*
* @param path Fully qualified file path.
* @return {@link FileStatus} object
*/
public static FileStatus of(String path) {
return new FileStatus(path, 0 /* size */, 0 /* modTime */);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading
Loading