Skip to content

Commit

Permalink
[Kernel] During Active-AddFile-Log-Replay do not pass the RemoveFile …
Browse files Browse the repository at this point in the history
…to checkpoint reader

#### Which Delta project/connector is this regarding?

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

Implemented a minor performance improvement to not read any RemoveFiles when we read checkpoint Parquet files during active-add-file-log-replay.

## How was this patch tested?

Existing unit test, manual test using
delta/kernel/examples/run-kernel-examples.py --use-local

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
anoopj committed Feb 8, 2025
1 parent 685379c commit a13a5da
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,12 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
checkpointPredicateIncludingSidecars = checkpointPredicate;
}
final CloseableIterator<ColumnarBatch> topLevelIter;
StructType finalModifiedReadSchema = modifiedReadSchema;
StructType finalCommitReadSchema = modifiedReadSchema;
// We do not need to look at any `remove` files from the checkpoints. Skip the column to save
// I/O. Note that we are still going to process the row groups. Adds and removes are randomly
// scattered through checkpoint part files, so row group push down is unlikely to be useful.
StructType finalCheckpointReadSchema = LogReplay.withoutRemoveFileSchema(modifiedReadSchema);

if (fileName.endsWith(".parquet")) {
topLevelIter =
wrapEngineExceptionThrowsIO(
Expand All @@ -204,7 +209,7 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
.getParquetHandler()
.readParquetFiles(
singletonCloseableIterator(file),
finalModifiedReadSchema,
finalCheckpointReadSchema,
checkpointPredicateIncludingSidecars),
"Reading parquet log file `%s` with readSchema=%s and predicate=%s",
file,
Expand All @@ -218,7 +223,7 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(file),
finalModifiedReadSchema,
finalCommitReadSchema,
checkpointPredicateIncludingSidecars),
"Reading JSON log file `%s` with readSchema=%s and predicate=%s",
file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,12 @@ private void prepareNext() {
}
}

ColumnarBatch scanAddFiles = addRemoveColumnarBatch;
// Step 3: Drop the RemoveFile column and use the selection vector to build a new
// FilteredColumnarBatch
ColumnarBatch scanAddFiles = addRemoveColumnarBatch.withDeletedColumnAt(1);
if (scanAddFiles.getSchema().length() > 1) {
scanAddFiles = scanAddFiles.withDeletedColumnAt(1);
}

// Step 4: TODO: remove this step. This is a temporary requirement until the path
// in `add` is converted to absolute path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) {
.add(REMOVEFILE_FIELD_NAME, REMOVE_FILE_SCHEMA);
}

public static StructType withoutRemoveFileSchema(StructType schema) {
StructType result = new StructType();
for (int i = 0; i < schema.length(); i++) {
if (!schema.at(i).getName().equals(REMOVEFILE_FIELD_NAME)) {
result = result.add(schema.at(i));
}
}
return result;
}

public static int ADD_FILE_ORDINAL = 0;
public static int ADD_FILE_PATH_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("path");
public static int ADD_FILE_DV_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("deletionVector");
Expand Down

0 comments on commit a13a5da

Please sign in to comment.