Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Minor refactor to DeltaLogActionUtils; add CloseableIterator takeWhile and other helpful methods #4097

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import io.delta.kernel.internal.util.FileNames.DeltaLogFileType;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.CloseableIterator.BreakableFilterResult;
import io.delta.kernel.utils.FileStatus;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -103,13 +105,14 @@ public static List<FileStatus> getCommitFilesForVersionRange(

// Get any available commit files within the version range
final List<FileStatus> commitFiles =
listDeltaLogFiles(
engine,
Collections.singleton(DeltaLogFileType.COMMIT),
tablePath,
startVersion,
Optional.of(endVersion),
false /* mustBeRecreatable */);
listDeltaLogFilesAsIter(
engine,
Collections.singleton(DeltaLogFileType.COMMIT),
tablePath,
startVersion,
Optional.of(endVersion),
false /* mustBeRecreatable */)
.toInMemoryList();

// There are no available commit files within the version range.
// This can be due to (1) an empty directory, (2) no valid delta files in the directory,
Expand Down Expand Up @@ -181,14 +184,14 @@ public static CloseableIterator<ColumnarBatch> readCommitFiles(
}

/**
* Returns the list of files of type $fileTypes in the _delta_log directory of the given
* $tablePath, in increasing order from $startVersion to the optional $endVersion.
* Returns a {@link CloseableIterator} of files of type $fileTypes in the _delta_log directory of
* the given $tablePath, in increasing order from $startVersion to the optional $endVersion.
*
* @throws TableNotFoundException if the table or its _delta_log does not exist
* @throws KernelException if mustBeRecreatable is true, endVersionOpt is present, and the
* _delta_log history has been truncated so that we cannot load the desired end version
*/
public static List<FileStatus> listDeltaLogFiles(
public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter(
Engine engine,
Set<DeltaLogFileType> fileTypes,
Path tablePath,
Expand All @@ -215,69 +218,58 @@ public static List<FileStatus> listDeltaLogFiles(
startVersion,
endVersionOpt);

final List<FileStatus> output = new ArrayList<>();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verbatim translate this logic to use the breakableFilter API instead

final long startTimeMillis = System.currentTimeMillis();

try (CloseableIterator<FileStatus> fsIter = listLogDir(engine, tablePath, startVersion)) {
while (fsIter.hasNext()) {
final FileStatus fs = fsIter.next();

if (fileTypes.contains(DeltaLogFileType.COMMIT)
&& FileNames.isCommitFile(getName(fs.getPath()))) {
// Here, we do nothing (we will consume this file).
} else if (fileTypes.contains(DeltaLogFileType.CHECKPOINT)
&& FileNames.isCheckpointFile(getName(fs.getPath()))
&& fs.getSize() > 0) {
// Checkpoint files of 0 size are invalid but may be ignored silently when read, hence we
// ignore them so that we never pick up such checkpoints.
// Here, we do nothing (we will consume this file).
} else {
logger.debug("Ignoring file {} as it is not of the desired type", fs.getPath());
continue; // Here, we continue and skip this file.
}

final long fileVersion = FileNames.getFileVersion(new Path(fs.getPath()));

if (fileVersion < startVersion) {
throw new RuntimeException(
String.format(
"Listing files in %s with startVersion %s yet found file %s with version %s",
logPath, startVersion, fs.getPath(), fileVersion));
}

if (endVersionOpt.isPresent()) {
final long endVersion = endVersionOpt.get();

if (fileVersion > endVersion) {
if (mustBeRecreatable && output.isEmpty()) {
final long earliestVersion =
DeltaHistoryManager.getEarliestRecreatableCommit(engine, logPath);
throw DeltaErrors.versionBeforeFirstAvailableCommit(
tablePath.toString(), endVersion, earliestVersion);
} else {
logger.debug(
"Stopping listing; found file {} with version > {}=endVersion",
fs.getPath(),
endVersion);
break;
}
}
}

output.add(fs);
}
} catch (IOException e) {
throw new UncheckedIOException("Unable to close resource", e);
}

logger.info(
"{}: Took {} ms to list the commit files for versions [{}, {}]",
tablePath,
System.currentTimeMillis() - startTimeMillis,
startVersion,
endVersionOpt);

return output;
// Must be final to be used in lambda
final AtomicBoolean hasReturnedAnElement = new AtomicBoolean(false);

return listLogDir(engine, tablePath, startVersion)
.breakableFilter(
fs -> {
if (fileTypes.contains(DeltaLogFileType.COMMIT)
&& FileNames.isCommitFile(getName(fs.getPath()))) {
// Here, we do nothing (we will consume this file).
} else if (fileTypes.contains(DeltaLogFileType.CHECKPOINT)
&& FileNames.isCheckpointFile(getName(fs.getPath()))
&& fs.getSize() > 0) {
// Checkpoint files of 0 size are invalid but may be ignored silently when read,
// hence we ignore them so that we never pick up such checkpoints.
// Here, we do nothing (we will consume this file).
} else {
logger.debug("Ignoring file {} as it is not of the desired type", fs.getPath());
return BreakableFilterResult.EXCLUDE; // Here, we exclude and filter out this file.
}

final long fileVersion = FileNames.getFileVersion(new Path(fs.getPath()));

if (fileVersion < startVersion) {
throw new RuntimeException(
String.format(
"Listing files in %s with startVersion %s yet found file %s.",
logPath, startVersion, fs.getPath()));
}

if (endVersionOpt.isPresent()) {
final long endVersion = endVersionOpt.get();

if (fileVersion > endVersion) {
if (mustBeRecreatable && !hasReturnedAnElement.get()) {
final long earliestVersion =
DeltaHistoryManager.getEarliestRecreatableCommit(engine, logPath);
throw DeltaErrors.versionBeforeFirstAvailableCommit(
tablePath.toString(), endVersion, earliestVersion);
} else {
logger.debug(
"Stopping listing; found file {} with version greater than endVersion {}",
fs.getPath(),
endVersion);
return BreakableFilterResult.BREAK;
}
}
}

hasReturnedAnElement.set(true);

return BreakableFilterResult.INCLUDE;
});
}

//////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,15 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionT

final long startTimeMillis = System.currentTimeMillis();
final List<FileStatus> listedFileStatuses =
DeltaLogActionUtils.listDeltaLogFiles(
engine,
new HashSet<>(Arrays.asList(DeltaLogFileType.COMMIT, DeltaLogFileType.CHECKPOINT)),
tablePath,
listFromStartVersion,
versionToLoadOpt,
true /* mustBeRecreatable */);
DeltaLogActionUtils.listDeltaLogFilesAsIter(
engine,
new HashSet<>(Arrays.asList(DeltaLogFileType.COMMIT, DeltaLogFileType.CHECKPOINT)),
tablePath,
listFromStartVersion,
versionToLoadOpt,
true /* mustBeRecreatable */)
.toInMemoryList();

logger.info(
"{}: Took {}ms to list the files after starting checkpoint",
tablePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import io.delta.kernel.internal.util.Utils;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Function;

Expand All @@ -36,6 +39,31 @@
@Evolving
public interface CloseableIterator<T> extends Iterator<T>, Closeable {

/**
* Represents the result of applying the filter condition in the {@link
* #breakableFilter(Function)} method of a {@link CloseableIterator}. This enum determines how
* each element in the iterator should be handled.
*/
enum BreakableFilterResult {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a common paradigm? It seems like breakableFilter could also be a combo of filter + takeWhile. But I see those are implemented using this so just curious on your reasoning

Copy link
Collaborator Author

@scottsand-db scottsand-db Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

breakableFilter could also be a combo of filter + takeWhile

Are you saying that we can implement breakableFilter by using a combination of both filter + takeWhile?

One of my tests has an example of breakableFilter: we want to: INCLUDE 1, EXCLUDE 2, INCLUDE 3, and BREAK at 4. After thinking about this for > 30 seconds, maybe there is a way to implement this using filter + takeWhile, but it doesn't jump out at me.

I think the semantics I lay out here are simpler than using .filter(....).takeWhile(...). For example, the implementation I did in DeltaLogActionUtils is cleanly implemented this way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    val result = throwingDataIter.breakableFilter { x =>
      if (x <= 1 || x == 3) {
        BreakableFilterResult.INCLUDE
      } else if (x == 2) {
        BreakableFilterResult.EXCLUDE
      } else if (x == 4) {
        BreakableFilterResult.BREAK
      } else {
        throw new RuntimeException("This should never be reached")
      }
    }

Wouldn't the test case just be? They seem equivalent to me

iter.filter( x => x != 2).takeWhile(x != 4)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Equivalently the snapshot manager code would probably be

    // Must be final to be used in lambda
    final AtomicBoolean hasReturnedAnElement = new AtomicBoolean(false);

    return listLogDir(engine, tablePath, startVersion)
        .filter(
            fs -> {
              if (fileTypes.contains(DeltaLogFileType.COMMIT)
                  && FileNames.isCommitFile(getName(fs.getPath()))) {
                return true
              } else if (fileTypes.contains(DeltaLogFileType.CHECKPOINT)
                  && FileNames.isCheckpointFile(getName(fs.getPath()))
                  && fs.getSize() > 0) {
                return true
              } else {
                return false
              }
           }
      .takeWhile(
          fs -> {
              final long fileVersion = FileNames.getFileVersion(new Path(fs.getPath()));

              if (fileVersion < startVersion) {
                throw new RuntimeException(
                    String.format(
                        "Listing files in %s with startVersion %s yet found file %s.",
                        logPath, startVersion, fs.getPath()));
              }

              if (endVersionOpt.isPresent()) {
                final long endVersion = endVersionOpt.get();

                if (fileVersion > endVersion) {
                  if (mustBeRecreatable && !hasReturnedAnElement.get()) {
                    final long earliestVersion =
                        DeltaHistoryManager.getEarliestRecreatableCommit(engine, logPath);
                    throw DeltaErrors.versionBeforeFirstAvailableCommit(
                        tablePath.toString(), endVersion, earliestVersion);
                  } else {
                    logger.debug(
                        "Stopping listing; found file {} with version greater than endVersion {}",
                        fs.getPath(),
                        endVersion);
                    return false;
                  }
                }
              }

              hasReturnedAnElement.set(true);

              return true;
            });

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually kind of like it; since it nicely delineates the filter mechanism versus the terminate the loop (takeWhile) mechanism. But they honestly are super similar so I don't feel too strongly about it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iter.filter( x => x != 2).takeWhile(x != 4) --> ah, I understand now, thanks for showing that.

I actually kind of like it; since it nicely delineates the filter mechanism versus the terminate the loop (takeWhile) mechanism.

I actually still prefer the current version. When they are separate, I need to understand 4 return values: 2 for the filter, and 2 for the takeWhile. It also introduces some coupling -- the takeWhile logic is inherintly tied to the filter logic -- e.g. it would we weird to filter(x -> x < 10) and then takeWhile(x > 20). IMO when it's all together, I think it's easier to parse the logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The good news is that this CloseableIterator API doesn't preclude your suggestion. i.e. other areas in the code are free to do .filter and then .takeWhile if we decide later on that that is best there.

Overall -- I'm inclined to stick with what I have now. Happy to refactor it laterr -- but really I'd like to unblock @huan233usc 's PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I just hesitate to add a new concept of this breakable filter when it can be distilled into two existing things. But I agree it's easy to understand so not a big deal

/**
* Indicates that the current element should be included in the resulting iterator produced by
* {@link #breakableFilter(Function)}.
*/
INCLUDE,

/**
* Indicates that the current element should be excluded from the resulting iterator produced by
* {@link #breakableFilter(Function)}.
*/
EXCLUDE,

/**
* Indicates that the iteration should stop immediately and that no further elements should be
* processed by {@link #breakableFilter(Function)}.
*/
BREAK
}

/**
* Returns true if the iteration has more elements. (In other words, returns true if next would
* return an element rather than throwing an exception.)
Expand Down Expand Up @@ -91,23 +119,81 @@ public void close() throws IOException {
};
}

/**
* Returns a new {@link CloseableIterator} that includes only the elements of this iterator for
* which the given {@code mapper} function returns {@code true}.
*
* @param mapper A function that determines whether an element should be included in the resulting
* iterator.
* @return A {@link CloseableIterator} that includes only the filtered the elements of this
* iterator.
*/
default CloseableIterator<T> filter(Function<T, Boolean> mapper) {
return breakableFilter(
t -> {
if (mapper.apply(t)) {
return BreakableFilterResult.INCLUDE;
} else {
return BreakableFilterResult.EXCLUDE;
}
});
}

/**
* Returns a new {@link CloseableIterator} that includes elements from this iterator as long as
* the given {@code mapper} function returns {@code true}. Once the mapper function returns {@code
* false}, the iteration is terminated.
*
* @param mapper A function that determines whether to include an element in the resulting
* iterator.
* @return A {@link CloseableIterator} that stops iteration when the condition is not met.
*/
default CloseableIterator<T> takeWhile(Function<T, Boolean> mapper) {
return breakableFilter(
t -> {
if (mapper.apply(t)) {
return BreakableFilterResult.INCLUDE;
} else {
return BreakableFilterResult.BREAK;
}
});
}

/**
* Returns a new {@link CloseableIterator} that applies a {@link BreakableFilterResult}-based
* filtering function to determine whether elements of this iterator should be included or
* excluded, or whether the iteration should terminate.
*
* @param mapper A function that determines the filtering action for each element: include,
* exclude, or break.
* @return A {@link CloseableIterator} that applies the specified {@link
* BreakableFilterResult}-based logic.
*/
default CloseableIterator<T> breakableFilter(Function<T, BreakableFilterResult> mapper) {
CloseableIterator<T> delegate = this;
return new CloseableIterator<T>() {
T next;
boolean hasLoadedNext;
boolean shouldBreak = false;

@Override
public boolean hasNext() {
if (shouldBreak) {
return false;
}
if (hasLoadedNext) {
return true;
}
while (delegate.hasNext()) {
T potentialNext = delegate.next();
if (mapper.apply(potentialNext)) {
final T potentialNext = delegate.next();
final BreakableFilterResult result = mapper.apply(potentialNext);
if (result == BreakableFilterResult.INCLUDE) {
next = potentialNext;
hasLoadedNext = true;
return true;
} else if (result == BreakableFilterResult.BREAK) {
shouldBreak = true;
return false;
}
}
return false;
Expand Down Expand Up @@ -160,4 +246,26 @@ public void close() throws IOException {
}
};
}

/**
* Collects all elements from this {@link CloseableIterator} into a {@link List}.
*
* <p>This method iterates through all elements of the iterator, storing them in an in-memory
* list. Once iteration is complete, the iterator is automatically closed to release any
* underlying resources.
*
* @return A {@link List} containing all elements from this iterator.
* @throws UncheckedIOException If an {@link IOException} occurs while closing the iterator.
*/
default List<T> toInMemoryList() {
final List<T> result = new ArrayList<>();
try (CloseableIterator<T> iterator = this) {
while (iterator.hasNext()) {
result.add(iterator.next());
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to close the CloseableIterator", e);
}
return result;
}
}
Loading
Loading