Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Minor refactor to DeltaLogActionUtils; add CloseableIterator takeWhile and other helpful methods #4097

Merged

Conversation

scottsand-db
Copy link
Collaborator

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR does the following:

  • changes our DeltaLogActionUtils listDeltaLogFiles method to return an iterator. This will eventually let it be used in a followup PR to further consolidate and clean up code in DeltaHistoryManager (which lists the delta log and expects an iterator)
  • adds CloseabelIterator::takeWhile, breakableFilter, and toInMemoryList

How was this patch tested?

  • New UTs

Does this PR introduce any user-facing changes?

No.

@@ -215,69 +218,59 @@ public static List<FileStatus> listDeltaLogFiles(
startVersion,
endVersionOpt);

final List<FileStatus> output = new ArrayList<>();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verbatim translate this logic to use the breakableFilter API instead

Copy link
Collaborator

@huan233usc huan233usc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with the under radar doc nit got fix(https://github.com/delta-io/delta/pull/4097/files#r1933026458)

@scottsand-db scottsand-db force-pushed the kernel_closeable_iterator_refactor branch from 749b8a0 to 424241f Compare January 29, 2025 03:53
Copy link
Collaborator

@allisonport-db allisonport-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just 1 tiny comment + a question

* #breakableFilter(Function)} method of a {@link CloseableIterator}. This enum determines how
* each element in the iterator should be handled.
*/
enum BreakableFilterResult {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a common paradigm? It seems like breakableFilter could also be a combo of filter + takeWhile. But I see those are implemented using this so just curious on your reasoning

Copy link
Collaborator Author

@scottsand-db scottsand-db Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

breakableFilter could also be a combo of filter + takeWhile

Are you saying that we can implement breakableFilter by using a combination of both filter + takeWhile?

One of my tests has an example of breakableFilter: we want to: INCLUDE 1, EXCLUDE 2, INCLUDE 3, and BREAK at 4. After thinking about this for > 30 seconds, maybe there is a way to implement this using filter + takeWhile, but it doesn't jump out at me.

I think the semantics I lay out here are simpler than using .filter(....).takeWhile(...). For example, the implementation I did in DeltaLogActionUtils is cleanly implemented this way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    val result = throwingDataIter.breakableFilter { x =>
      if (x <= 1 || x == 3) {
        BreakableFilterResult.INCLUDE
      } else if (x == 2) {
        BreakableFilterResult.EXCLUDE
      } else if (x == 4) {
        BreakableFilterResult.BREAK
      } else {
        throw new RuntimeException("This should never be reached")
      }
    }

Wouldn't the test case just be? They seem equivalent to me

iter.filter( x => x != 2).takeWhile(x != 4)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Equivalently the snapshot manager code would probably be

    // Must be final to be used in lambda
    final AtomicBoolean hasReturnedAnElement = new AtomicBoolean(false);

    return listLogDir(engine, tablePath, startVersion)
        .filter(
            fs -> {
              if (fileTypes.contains(DeltaLogFileType.COMMIT)
                  && FileNames.isCommitFile(getName(fs.getPath()))) {
                return true
              } else if (fileTypes.contains(DeltaLogFileType.CHECKPOINT)
                  && FileNames.isCheckpointFile(getName(fs.getPath()))
                  && fs.getSize() > 0) {
                return true
              } else {
                return false
              }
           }
      .takeWhile(
          fs -> {
              final long fileVersion = FileNames.getFileVersion(new Path(fs.getPath()));

              if (fileVersion < startVersion) {
                throw new RuntimeException(
                    String.format(
                        "Listing files in %s with startVersion %s yet found file %s.",
                        logPath, startVersion, fs.getPath()));
              }

              if (endVersionOpt.isPresent()) {
                final long endVersion = endVersionOpt.get();

                if (fileVersion > endVersion) {
                  if (mustBeRecreatable && !hasReturnedAnElement.get()) {
                    final long earliestVersion =
                        DeltaHistoryManager.getEarliestRecreatableCommit(engine, logPath);
                    throw DeltaErrors.versionBeforeFirstAvailableCommit(
                        tablePath.toString(), endVersion, earliestVersion);
                  } else {
                    logger.debug(
                        "Stopping listing; found file {} with version greater than endVersion {}",
                        fs.getPath(),
                        endVersion);
                    return false;
                  }
                }
              }

              hasReturnedAnElement.set(true);

              return true;
            });

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually kind of like it; since it nicely delineates the filter mechanism versus the terminate the loop (takeWhile) mechanism. But they honestly are super similar so I don't feel too strongly about it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iter.filter( x => x != 2).takeWhile(x != 4) --> ah, I understand now, thanks for showing that.

I actually kind of like it; since it nicely delineates the filter mechanism versus the terminate the loop (takeWhile) mechanism.

I actually still prefer the current version. When they are separate, I need to understand 4 return values: 2 for the filter, and 2 for the takeWhile. It also introduces some coupling -- the takeWhile logic is inherintly tied to the filter logic -- e.g. it would we weird to filter(x -> x < 10) and then takeWhile(x > 20). IMO when it's all together, I think it's easier to parse the logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The good news is that this CloseableIterator API doesn't preclude your suggestion. i.e. other areas in the code are free to do .filter and then .takeWhile if we decide later on that that is best there.

Overall -- I'm inclined to stick with what I have now. Happy to refactor it laterr -- but really I'd like to unblock @huan233usc 's PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I just hesitate to add a new concept of this breakable filter when it can be distilled into two existing things. But I agree it's easy to understand so not a big deal

@scottsand-db scottsand-db merged commit 75d6b8e into delta-io:master Jan 30, 2025
18 of 19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants