Skip to content

Commit

Permalink
[core] Refactor codes in AbstractFileStoreScan.plan
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Sep 26, 2024
1 parent 722994e commit 703a2a2
Showing 1 changed file with 48 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,22 +228,65 @@ public ManifestsReader manifestsReader() {

@Override
public Plan plan() {
Pair<Snapshot, List<ManifestEntry>> planResult = doPlan();
long started = System.nanoTime();
ManifestsReader.Result manifestsResult = readManifests();
Snapshot snapshot = manifestsResult.snapshot;
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;

long startDataFiles =
manifestsResult.allManifests.stream()
.mapToLong(f -> f.numAddedFiles() - f.numDeletedFiles())
.sum();

Collection<ManifestEntry> mergedEntries =
readAndMergeFileEntries(manifests, this::readManifest);

long skippedByPartitionAndStats = startDataFiles - mergedEntries.size();

// We group files by bucket here, and filter them by the whole bucket filter.
// Why do this: because in primary key table, we can't just filter the value
// by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
// but we can do this by filter the whole bucket files
List<ManifestEntry> files =
mergedEntries.stream()
.collect(
Collectors.groupingBy(
// we use LinkedHashMap to avoid disorder
file -> Pair.of(file.partition(), file.bucket()),
LinkedHashMap::new,
Collectors.toList()))
.values()
.stream()
.map(this::filterWholeBucketByStats)
.flatMap(Collection::stream)
.collect(Collectors.toList());

final Snapshot readSnapshot = planResult.getLeft();
final List<ManifestEntry> files = planResult.getRight();
long skippedByWholeBucketFiles = mergedEntries.size() - files.size();
long scanDuration = (System.nanoTime() - started) / 1_000_000;
checkState(
startDataFiles - skippedByPartitionAndStats - skippedByWholeBucketFiles
== files.size());
if (scanMetrics != null) {
scanMetrics.reportScan(
new ScanStats(
scanDuration,
manifests.size(),
skippedByPartitionAndStats,
skippedByWholeBucketFiles,
files.size()));
}

return new Plan() {
@Nullable
@Override
public Long watermark() {
return readSnapshot == null ? null : readSnapshot.watermark();
return snapshot == null ? null : snapshot.watermark();
}

@Nullable
@Override
public Snapshot snapshot() {
return readSnapshot;
return snapshot;
}

@Override
Expand Down Expand Up @@ -300,57 +343,6 @@ public Iterator<ManifestEntry> readFileIterator() {
&& !deleteEntries.contains(entry.identifier()));
}

private Pair<Snapshot, List<ManifestEntry>> doPlan() {
long started = System.nanoTime();
ManifestsReader.Result manifestsResult = readManifests();
Snapshot snapshot = manifestsResult.snapshot;
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;

long startDataFiles =
manifestsResult.allManifests.stream()
.mapToLong(f -> f.numAddedFiles() - f.numDeletedFiles())
.sum();

Collection<ManifestEntry> mergedEntries =
readAndMergeFileEntries(manifests, this::readManifest);

long skippedByPartitionAndStats = startDataFiles - mergedEntries.size();

// We group files by bucket here, and filter them by the whole bucket filter.
// Why do this: because in primary key table, we can't just filter the value
// by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
// but we can do this by filter the whole bucket files
List<ManifestEntry> files =
mergedEntries.stream()
.collect(
Collectors.groupingBy(
// we use LinkedHashMap to avoid disorder
file -> Pair.of(file.partition(), file.bucket()),
LinkedHashMap::new,
Collectors.toList()))
.values()
.stream()
.map(this::filterWholeBucketByStats)
.flatMap(Collection::stream)
.collect(Collectors.toList());

long skippedByWholeBucketFiles = mergedEntries.size() - files.size();
long scanDuration = (System.nanoTime() - started) / 1_000_000;
checkState(
startDataFiles - skippedByPartitionAndStats - skippedByWholeBucketFiles
== files.size());
if (scanMetrics != null) {
scanMetrics.reportScan(
new ScanStats(
scanDuration,
manifests.size(),
skippedByPartitionAndStats,
skippedByWholeBucketFiles,
files.size()));
}
return Pair.of(snapshot, files);
}

public <T extends FileEntry> Collection<T> readAndMergeFileEntries(
List<ManifestFileMeta> manifests, Function<ManifestFileMeta, List<T>> manifestReader) {
return FileEntry.mergeEntries(
Expand Down

0 comments on commit 703a2a2

Please sign in to comment.