Skip to content

Commit

Permalink
[Kernel] Minor refactor to DeltaLogActionUtils; add CloseableIterator…
Browse files Browse the repository at this point in the history
… takeWhile and other helpful methods (#4097)

#### Which Delta project/connector is this regarding?
- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

This PR does the following:
- changes our DeltaLogActionUtils `listDeltaLogFiles` method to return
an iterator. This will eventually let it be used in a followup PR to
further consolidate and clean up code in DeltaHistoryManager (which
lists the delta log and expects an iterator)
- adds CloseabelIterator::takeWhile, breakableFilter, and toInMemoryList

## How was this patch tested?

- New UTs

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
scottsand-db authored Jan 30, 2025
1 parent ee98a22 commit 75d6b8e
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import io.delta.kernel.internal.util.FileNames.DeltaLogFileType;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.CloseableIterator.BreakableFilterResult;
import io.delta.kernel.utils.FileStatus;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -103,13 +105,14 @@ public static List<FileStatus> getCommitFilesForVersionRange(

// Get any available commit files within the version range
final List<FileStatus> commitFiles =
listDeltaLogFiles(
engine,
Collections.singleton(DeltaLogFileType.COMMIT),
tablePath,
startVersion,
Optional.of(endVersion),
false /* mustBeRecreatable */);
listDeltaLogFilesAsIter(
engine,
Collections.singleton(DeltaLogFileType.COMMIT),
tablePath,
startVersion,
Optional.of(endVersion),
false /* mustBeRecreatable */)
.toInMemoryList();

// There are no available commit files within the version range.
// This can be due to (1) an empty directory, (2) no valid delta files in the directory,
Expand Down Expand Up @@ -181,14 +184,14 @@ public static CloseableIterator<ColumnarBatch> readCommitFiles(
}

/**
* Returns the list of files of type $fileTypes in the _delta_log directory of the given
* $tablePath, in increasing order from $startVersion to the optional $endVersion.
* Returns a {@link CloseableIterator} of files of type $fileTypes in the _delta_log directory of
* the given $tablePath, in increasing order from $startVersion to the optional $endVersion.
*
* @throws TableNotFoundException if the table or its _delta_log does not exist
* @throws KernelException if mustBeRecreatable is true, endVersionOpt is present, and the
* _delta_log history has been truncated so that we cannot load the desired end version
*/
public static List<FileStatus> listDeltaLogFiles(
public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter(
Engine engine,
Set<DeltaLogFileType> fileTypes,
Path tablePath,
Expand All @@ -215,69 +218,58 @@ public static List<FileStatus> listDeltaLogFiles(
startVersion,
endVersionOpt);

final List<FileStatus> output = new ArrayList<>();
final long startTimeMillis = System.currentTimeMillis();

try (CloseableIterator<FileStatus> fsIter = listLogDir(engine, tablePath, startVersion)) {
while (fsIter.hasNext()) {
final FileStatus fs = fsIter.next();

if (fileTypes.contains(DeltaLogFileType.COMMIT)
&& FileNames.isCommitFile(getName(fs.getPath()))) {
// Here, we do nothing (we will consume this file).
} else if (fileTypes.contains(DeltaLogFileType.CHECKPOINT)
&& FileNames.isCheckpointFile(getName(fs.getPath()))
&& fs.getSize() > 0) {
// Checkpoint files of 0 size are invalid but may be ignored silently when read, hence we
// ignore them so that we never pick up such checkpoints.
// Here, we do nothing (we will consume this file).
} else {
logger.debug("Ignoring file {} as it is not of the desired type", fs.getPath());
continue; // Here, we continue and skip this file.
}

final long fileVersion = FileNames.getFileVersion(new Path(fs.getPath()));

if (fileVersion < startVersion) {
throw new RuntimeException(
String.format(
"Listing files in %s with startVersion %s yet found file %s with version %s",
logPath, startVersion, fs.getPath(), fileVersion));
}

if (endVersionOpt.isPresent()) {
final long endVersion = endVersionOpt.get();

if (fileVersion > endVersion) {
if (mustBeRecreatable && output.isEmpty()) {
final long earliestVersion =
DeltaHistoryManager.getEarliestRecreatableCommit(engine, logPath);
throw DeltaErrors.versionBeforeFirstAvailableCommit(
tablePath.toString(), endVersion, earliestVersion);
} else {
logger.debug(
"Stopping listing; found file {} with version > {}=endVersion",
fs.getPath(),
endVersion);
break;
}
}
}

output.add(fs);
}
} catch (IOException e) {
throw new UncheckedIOException("Unable to close resource", e);
}

logger.info(
"{}: Took {} ms to list the commit files for versions [{}, {}]",
tablePath,
System.currentTimeMillis() - startTimeMillis,
startVersion,
endVersionOpt);

return output;
// Must be final to be used in lambda
final AtomicBoolean hasReturnedAnElement = new AtomicBoolean(false);

return listLogDir(engine, tablePath, startVersion)
.breakableFilter(
fs -> {
if (fileTypes.contains(DeltaLogFileType.COMMIT)
&& FileNames.isCommitFile(getName(fs.getPath()))) {
// Here, we do nothing (we will consume this file).
} else if (fileTypes.contains(DeltaLogFileType.CHECKPOINT)
&& FileNames.isCheckpointFile(getName(fs.getPath()))
&& fs.getSize() > 0) {
// Checkpoint files of 0 size are invalid but may be ignored silently when read,
// hence we ignore them so that we never pick up such checkpoints.
// Here, we do nothing (we will consume this file).
} else {
logger.debug("Ignoring file {} as it is not of the desired type", fs.getPath());
return BreakableFilterResult.EXCLUDE; // Here, we exclude and filter out this file.
}

final long fileVersion = FileNames.getFileVersion(new Path(fs.getPath()));

if (fileVersion < startVersion) {
throw new RuntimeException(
String.format(
"Listing files in %s with startVersion %s yet found file %s.",
logPath, startVersion, fs.getPath()));
}

if (endVersionOpt.isPresent()) {
final long endVersion = endVersionOpt.get();

if (fileVersion > endVersion) {
if (mustBeRecreatable && !hasReturnedAnElement.get()) {
final long earliestVersion =
DeltaHistoryManager.getEarliestRecreatableCommit(engine, logPath);
throw DeltaErrors.versionBeforeFirstAvailableCommit(
tablePath.toString(), endVersion, earliestVersion);
} else {
logger.debug(
"Stopping listing; found file {} with version greater than endVersion {}",
fs.getPath(),
endVersion);
return BreakableFilterResult.BREAK;
}
}
}

hasReturnedAnElement.set(true);

return BreakableFilterResult.INCLUDE;
});
}

//////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,15 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionT

final long startTimeMillis = System.currentTimeMillis();
final List<FileStatus> listedFileStatuses =
DeltaLogActionUtils.listDeltaLogFiles(
engine,
new HashSet<>(Arrays.asList(DeltaLogFileType.COMMIT, DeltaLogFileType.CHECKPOINT)),
tablePath,
listFromStartVersion,
versionToLoadOpt,
true /* mustBeRecreatable */);
DeltaLogActionUtils.listDeltaLogFilesAsIter(
engine,
new HashSet<>(Arrays.asList(DeltaLogFileType.COMMIT, DeltaLogFileType.CHECKPOINT)),
tablePath,
listFromStartVersion,
versionToLoadOpt,
true /* mustBeRecreatable */)
.toInMemoryList();

logger.info(
"{}: Took {}ms to list the files after starting checkpoint",
tablePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import io.delta.kernel.internal.util.Utils;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Function;

Expand All @@ -36,6 +39,31 @@
@Evolving
public interface CloseableIterator<T> extends Iterator<T>, Closeable {

/**
* Represents the result of applying the filter condition in the {@link
* #breakableFilter(Function)} method of a {@link CloseableIterator}. This enum determines how
* each element in the iterator should be handled.
*/
enum BreakableFilterResult {
/**
* Indicates that the current element should be included in the resulting iterator produced by
* {@link #breakableFilter(Function)}.
*/
INCLUDE,

/**
* Indicates that the current element should be excluded from the resulting iterator produced by
* {@link #breakableFilter(Function)}.
*/
EXCLUDE,

/**
* Indicates that the iteration should stop immediately and that no further elements should be
* processed by {@link #breakableFilter(Function)}.
*/
BREAK
}

/**
* Returns true if the iteration has more elements. (In other words, returns true if next would
* return an element rather than throwing an exception.)
Expand Down Expand Up @@ -91,23 +119,81 @@ public void close() throws IOException {
};
}

/**
* Returns a new {@link CloseableIterator} that includes only the elements of this iterator for
* which the given {@code mapper} function returns {@code true}.
*
* @param mapper A function that determines whether an element should be included in the resulting
* iterator.
* @return A {@link CloseableIterator} that includes only the filtered the elements of this
* iterator.
*/
default CloseableIterator<T> filter(Function<T, Boolean> mapper) {
return breakableFilter(
t -> {
if (mapper.apply(t)) {
return BreakableFilterResult.INCLUDE;
} else {
return BreakableFilterResult.EXCLUDE;
}
});
}

/**
* Returns a new {@link CloseableIterator} that includes elements from this iterator as long as
* the given {@code mapper} function returns {@code true}. Once the mapper function returns {@code
* false}, the iteration is terminated.
*
* @param mapper A function that determines whether to include an element in the resulting
* iterator.
* @return A {@link CloseableIterator} that stops iteration when the condition is not met.
*/
default CloseableIterator<T> takeWhile(Function<T, Boolean> mapper) {
return breakableFilter(
t -> {
if (mapper.apply(t)) {
return BreakableFilterResult.INCLUDE;
} else {
return BreakableFilterResult.BREAK;
}
});
}

/**
* Returns a new {@link CloseableIterator} that applies a {@link BreakableFilterResult}-based
* filtering function to determine whether elements of this iterator should be included or
* excluded, or whether the iteration should terminate.
*
* @param mapper A function that determines the filtering action for each element: include,
* exclude, or break.
* @return A {@link CloseableIterator} that applies the specified {@link
* BreakableFilterResult}-based logic.
*/
default CloseableIterator<T> breakableFilter(Function<T, BreakableFilterResult> mapper) {
CloseableIterator<T> delegate = this;
return new CloseableIterator<T>() {
T next;
boolean hasLoadedNext;
boolean shouldBreak = false;

@Override
public boolean hasNext() {
if (shouldBreak) {
return false;
}
if (hasLoadedNext) {
return true;
}
while (delegate.hasNext()) {
T potentialNext = delegate.next();
if (mapper.apply(potentialNext)) {
final T potentialNext = delegate.next();
final BreakableFilterResult result = mapper.apply(potentialNext);
if (result == BreakableFilterResult.INCLUDE) {
next = potentialNext;
hasLoadedNext = true;
return true;
} else if (result == BreakableFilterResult.BREAK) {
shouldBreak = true;
return false;
}
}
return false;
Expand Down Expand Up @@ -160,4 +246,26 @@ public void close() throws IOException {
}
};
}

/**
* Collects all elements from this {@link CloseableIterator} into a {@link List}.
*
* <p>This method iterates through all elements of the iterator, storing them in an in-memory
* list. Once iteration is complete, the iterator is automatically closed to release any
* underlying resources.
*
* @return A {@link List} containing all elements from this iterator.
* @throws UncheckedIOException If an {@link IOException} occurs while closing the iterator.
*/
default List<T> toInMemoryList() {
final List<T> result = new ArrayList<>();
try (CloseableIterator<T> iterator = this) {
while (iterator.hasNext()) {
result.add(iterator.next());
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to close the CloseableIterator", e);
}
return result;
}
}
Loading

0 comments on commit 75d6b8e

Please sign in to comment.