diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 032738c38c95..5a4a79e0d702 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -324,11 +324,14 @@ private List generateSplits( Map>> groupedDataFiles) { List splits = new ArrayList<>(); // Read deletion indexes at once to reduce file IO - Map, List> deletionIndexFilesMap = - deletionVectors && snapshot != null - ? indexFileHandler.scan( - snapshot, DELETION_VECTORS_INDEX, groupedDataFiles.keySet()) - : Collections.emptyMap(); + Map, List> deletionIndexFilesMap = null; + if (!isStreaming) { + deletionIndexFilesMap = + deletionVectors && snapshot != null + ? indexFileHandler.scan( + snapshot, DELETION_VECTORS_INDEX, groupedDataFiles.keySet()) + : Collections.emptyMap(); + } for (Map.Entry>> entry : groupedDataFiles.entrySet()) { BinaryRow partition = entry.getKey(); @@ -347,18 +350,19 @@ private List generateSplits( isStreaming ? splitGenerator.splitForStreaming(bucketFiles) : splitGenerator.splitForBatch(bucketFiles); - List deletionIndexFiles = - deletionIndexFilesMap.getOrDefault( - Pair.of(partition, bucket), Collections.emptyList()); for (SplitGenerator.SplitGroup splitGroup : splitGroups) { List dataFiles = splitGroup.files; String bucketPath = pathFactory.bucketPath(partition, bucket).toString(); builder.withDataFiles(dataFiles) .rawConvertible(splitGroup.rawConvertible) .withBucketPath(bucketPath); - if (deletionVectors) { + if (deletionVectors && deletionIndexFilesMap != null) { builder.withDataDeletionFiles( - getDeletionFiles(dataFiles, deletionIndexFiles)); + getDeletionFiles( + dataFiles, + deletionIndexFilesMap.getOrDefault( + Pair.of(partition, bucket), + Collections.emptyList()))); } splits.add(builder.build()); @@ -419,16 +423,20 @@ private Plan toChangesPlan( buckets.computeIfAbsent(part, k -> new HashSet<>()) .addAll(bucketMap.keySet())); // Read deletion indexes at once to reduce file IO - Map, List> beforDeletionIndexFilesMap = - deletionVectors - ? indexFileHandler.scan( - beforeSnapshot, DELETION_VECTORS_INDEX, beforeFiles.keySet()) - : Collections.emptyMap(); - Map, List> deletionIndexFilesMap = - deletionVectors - ? indexFileHandler.scan( - snapshot, DELETION_VECTORS_INDEX, dataFiles.keySet()) - : Collections.emptyMap(); + Map, List> beforDeletionIndexFilesMap = null; + Map, List> deletionIndexFilesMap = null; + if (!isStreaming) { + beforDeletionIndexFilesMap = + deletionVectors + ? indexFileHandler.scan( + beforeSnapshot, DELETION_VECTORS_INDEX, beforeFiles.keySet()) + : Collections.emptyMap(); + deletionIndexFilesMap = + deletionVectors + ? indexFileHandler.scan( + snapshot, DELETION_VECTORS_INDEX, dataFiles.keySet()) + : Collections.emptyMap(); + } for (Map.Entry> entry : buckets.entrySet()) { BinaryRow part = entry.getKey(); @@ -454,7 +462,9 @@ private Plan toChangesPlan( .withDataFiles(data) .isStreaming(isStreaming) .withBucketPath(pathFactory.bucketPath(part, bucket).toString()); - if (deletionVectors) { + if (deletionVectors + && beforDeletionIndexFilesMap != null + && deletionIndexFilesMap != null) { builder.withBeforeDeletionFiles( getDeletionFiles( before, diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index ecb42d766901..34e05dd8bdad 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -58,6 +58,7 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.OutOfRangeException; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.source.TableRead; @@ -107,6 +108,7 @@ import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN; import static org.apache.paimon.CoreOptions.COMPACTION_MAX_FILE_NUM; import static org.apache.paimon.CoreOptions.CONSUMER_IGNORE_PROGRESS; +import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED; import static org.apache.paimon.CoreOptions.ExpireExecutionMode; import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.CoreOptions.SNAPSHOT_CLEAN_EMPTY_DIRECTORIES; @@ -1652,6 +1654,60 @@ public void testBranchWriteAndRead() throws Exception { "4|40|400|binary|varbinary|mapKey:mapVal|multiset"); } + @Test + public void testDataSplitNotIncludeDvFilesWhenStreamingRead() throws Exception { + FileStoreTable table = createFileStoreTable(); + Map options = new HashMap<>(); + options.put(DELETION_VECTORS_ENABLED.key(), "true"); + options.put(WRITE_ONLY.key(), "true"); + table = table.copy(options); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + for (int i = 0; i < 10; i++) { + write.write(rowData(i, 10 * i, 100L * i)); + commit.commit(i, write.prepareCommit(false, i)); + } + } + + List splits = + toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits()); + + for (Split split : splits) { + DataSplit dataSplit = (DataSplit) split; + Assertions.assertThat(dataSplit.deletionFiles().isPresent()).isFalse(); + } + } + + @Test + public void testDataSplitNotIncludeDvFilesWhenStreamingReadChanges() throws Exception { + FileStoreTable table = createFileStoreTable(); + Map options = new HashMap<>(); + options.put(DELETION_VECTORS_ENABLED.key(), "true"); + options.put(WRITE_ONLY.key(), "true"); + table = table.copy(options); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + for (int i = 0; i < 10; i++) { + write.write(rowData(i, 10 * i, 100L * i)); + commit.commit(i, write.prepareCommit(false, i)); + } + } + + List splits = + toSplits( + table.newSnapshotReader() + .withMode(ScanMode.DELTA) + .readChanges() + .dataSplits()); + + for (Split split : splits) { + DataSplit dataSplit = (DataSplit) split; + Assertions.assertThat(dataSplit.deletionFiles().isPresent()).isFalse(); + } + } + protected List getResult( TableRead read, List splits,