Skip to content

Commit

Permalink
[core] fix bug when read changelog's index manifest (#5058)
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 authored Feb 12, 2025
1 parent e7ccd39 commit 84d48bf
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,14 @@ private List<DataSplit> generateSplits(
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
List<DataSplit> splits = new ArrayList<>();
// Read deletion indexes at once to reduce file IO
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> deletionIndexFilesMap =
deletionVectors && snapshot != null
? indexFileHandler.scan(
snapshot, DELETION_VECTORS_INDEX, groupedDataFiles.keySet())
: Collections.emptyMap();
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> deletionIndexFilesMap = null;
if (!isStreaming) {
deletionIndexFilesMap =
deletionVectors && snapshot != null
? indexFileHandler.scan(
snapshot, DELETION_VECTORS_INDEX, groupedDataFiles.keySet())
: Collections.emptyMap();
}
for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
groupedDataFiles.entrySet()) {
BinaryRow partition = entry.getKey();
Expand All @@ -347,18 +350,19 @@ private List<DataSplit> generateSplits(
isStreaming
? splitGenerator.splitForStreaming(bucketFiles)
: splitGenerator.splitForBatch(bucketFiles);
List<IndexFileMeta> deletionIndexFiles =
deletionIndexFilesMap.getOrDefault(
Pair.of(partition, bucket), Collections.emptyList());
for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
List<DataFileMeta> dataFiles = splitGroup.files;
String bucketPath = pathFactory.bucketPath(partition, bucket).toString();
builder.withDataFiles(dataFiles)
.rawConvertible(splitGroup.rawConvertible)
.withBucketPath(bucketPath);
if (deletionVectors) {
if (deletionVectors && deletionIndexFilesMap != null) {
builder.withDataDeletionFiles(
getDeletionFiles(dataFiles, deletionIndexFiles));
getDeletionFiles(
dataFiles,
deletionIndexFilesMap.getOrDefault(
Pair.of(partition, bucket),
Collections.emptyList())));
}

splits.add(builder.build());
Expand Down Expand Up @@ -419,16 +423,20 @@ private Plan toChangesPlan(
buckets.computeIfAbsent(part, k -> new HashSet<>())
.addAll(bucketMap.keySet()));
// Read deletion indexes at once to reduce file IO
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> beforDeletionIndexFilesMap =
deletionVectors
? indexFileHandler.scan(
beforeSnapshot, DELETION_VECTORS_INDEX, beforeFiles.keySet())
: Collections.emptyMap();
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> deletionIndexFilesMap =
deletionVectors
? indexFileHandler.scan(
snapshot, DELETION_VECTORS_INDEX, dataFiles.keySet())
: Collections.emptyMap();
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> beforDeletionIndexFilesMap = null;
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> deletionIndexFilesMap = null;
if (!isStreaming) {
beforDeletionIndexFilesMap =
deletionVectors
? indexFileHandler.scan(
beforeSnapshot, DELETION_VECTORS_INDEX, beforeFiles.keySet())
: Collections.emptyMap();
deletionIndexFilesMap =
deletionVectors
? indexFileHandler.scan(
snapshot, DELETION_VECTORS_INDEX, dataFiles.keySet())
: Collections.emptyMap();
}

for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
BinaryRow part = entry.getKey();
Expand All @@ -454,7 +462,9 @@ private Plan toChangesPlan(
.withDataFiles(data)
.isStreaming(isStreaming)
.withBucketPath(pathFactory.bucketPath(part, bucket).toString());
if (deletionVectors) {
if (deletionVectors
&& beforDeletionIndexFilesMap != null
&& deletionIndexFilesMap != null) {
builder.withBeforeDeletionFiles(
getDeletionFiles(
before,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
Expand Down Expand Up @@ -107,6 +108,7 @@
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.COMPACTION_MAX_FILE_NUM;
import static org.apache.paimon.CoreOptions.CONSUMER_IGNORE_PROGRESS;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.SNAPSHOT_CLEAN_EMPTY_DIRECTORIES;
Expand Down Expand Up @@ -1652,6 +1654,60 @@ public void testBranchWriteAndRead() throws Exception {
"4|40|400|binary|varbinary|mapKey:mapVal|multiset");
}

@Test
public void testDataSplitNotIncludeDvFilesWhenStreamingRead() throws Exception {
FileStoreTable table = createFileStoreTable();
Map<String, String> options = new HashMap<>();
options.put(DELETION_VECTORS_ENABLED.key(), "true");
options.put(WRITE_ONLY.key(), "true");
table = table.copy(options);

try (StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser)) {
for (int i = 0; i < 10; i++) {
write.write(rowData(i, 10 * i, 100L * i));
commit.commit(i, write.prepareCommit(false, i));
}
}

List<Split> splits =
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());

for (Split split : splits) {
DataSplit dataSplit = (DataSplit) split;
Assertions.assertThat(dataSplit.deletionFiles().isPresent()).isFalse();
}
}

@Test
public void testDataSplitNotIncludeDvFilesWhenStreamingReadChanges() throws Exception {
FileStoreTable table = createFileStoreTable();
Map<String, String> options = new HashMap<>();
options.put(DELETION_VECTORS_ENABLED.key(), "true");
options.put(WRITE_ONLY.key(), "true");
table = table.copy(options);

try (StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser)) {
for (int i = 0; i < 10; i++) {
write.write(rowData(i, 10 * i, 100L * i));
commit.commit(i, write.prepareCommit(false, i));
}
}

List<Split> splits =
toSplits(
table.newSnapshotReader()
.withMode(ScanMode.DELTA)
.readChanges()
.dataSplits());

for (Split split : splits) {
DataSplit dataSplit = (DataSplit) split;
Assertions.assertThat(dataSplit.deletionFiles().isPresent()).isFalse();
}
}

protected List<String> getResult(
TableRead read,
List<Split> splits,
Expand Down

0 comments on commit 84d48bf

Please sign in to comment.