From 339ef74eaf78ef461e13ce807e640410a6d5ac78 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Tue, 12 Mar 2024 13:49:40 +0800 Subject: [PATCH] unify deletion vector apply --- .../deletionvectors/DeletionVector.java | 45 +++++++++++++++++++ .../paimon/io/KeyValueFileReaderFactory.java | 34 +++++--------- .../operation/KeyValueFileStoreRead.java | 29 ++++++------ .../operation/KeyValueFileStoreWrite.java | 10 ++--- .../paimon/table/query/LocalTableQuery.java | 5 ++- .../snapshot/IncrementalStartingScanner.java | 1 + .../paimon/io/KeyValueFileReadWriteTest.java | 3 +- .../paimon/mergetree/ContainsLevelsTest.java | 3 +- .../paimon/mergetree/LookupLevelsTest.java | 3 +- .../paimon/mergetree/MergeTreeTestBase.java | 7 ++- 10 files changed, 90 insertions(+), 50 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java index 50aa8f64ea09..be20d3891410 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java @@ -21,11 +21,18 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.table.source.DeletionFile; +import javax.annotation.Nullable; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; /** * The DeletionVector can efficiently record the positions of rows that are deleted in a file, which @@ -118,4 +125,42 @@ static DeletionVector read(FileIO fileIO, DeletionFile deletionFile) throws IOEx } } } + + static Factory emptyFactory() { + return fileName -> Optional.empty(); + } + + static Factory factory(@Nullable DeletionVectorsMaintainer dvMaintainer) { + if (dvMaintainer == null) { + return emptyFactory(); + } + return dvMaintainer::deletionVectorOf; + } + + static Factory factory( + FileIO fileIO, List files, @Nullable List deletionFiles) { + if (deletionFiles == null) { + return emptyFactory(); + } + Map fileToDeletion = new HashMap<>(); + for (int i = 0; i < files.size(); i++) { + DeletionFile deletionFile = deletionFiles.get(i); + if (deletionFile != null) { + fileToDeletion.put(files.get(i).fileName(), deletionFile); + } + } + return fileName -> { + DeletionFile deletionFile = fileToDeletion.get(fileName); + if (deletionFile == null) { + return Optional.empty(); + } + + return Optional.of(DeletionVector.read(fileIO, deletionFile)); + }; + } + + /** Interface to create {@link DeletionVector}. */ + interface Factory { + Optional create(String fileName) throws IOException; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index b946756aeee2..63fef31fc142 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -45,7 +45,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Function; import java.util.function.Supplier; /** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */ @@ -63,9 +62,7 @@ public class KeyValueFileReaderFactory { private final Map bulkFormatMappings; private final BinaryRow partition; - - // FileName to its corresponding deletion vector - private final @Nullable Function> deletionVectorSupplier; + private final DeletionVector.Factory dvFactory; private KeyValueFileReaderFactory( FileIO fileIO, @@ -77,7 +74,7 @@ private KeyValueFileReaderFactory( DataFilePathFactory pathFactory, long asyncThreshold, BinaryRow partition, - @Nullable Function> deletionVectorSupplier) { + DeletionVector.Factory dvFactory) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.schemaId = schemaId; @@ -88,7 +85,7 @@ private KeyValueFileReaderFactory( this.asyncThreshold = asyncThreshold; this.partition = partition; this.bulkFormatMappings = new HashMap<>(); - this.deletionVectorSupplier = deletionVectorSupplier; + this.dvFactory = dvFactory; } public RecordReader createRecordReader( @@ -134,13 +131,9 @@ private RecordReader createRecordReader( bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); - if (deletionVectorSupplier != null) { - Optional optionalDeletionVector = - deletionVectorSupplier.apply(fileName); - if (optionalDeletionVector.isPresent() && !optionalDeletionVector.get().isEmpty()) { - recordReader = - new ApplyDeletionVectorReader<>(recordReader, optionalDeletionVector.get()); - } + Optional deletionVector = dvFactory.create(fileName); + if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { + recordReader = new ApplyDeletionVectorReader<>(recordReader, deletionVector.get()); } return recordReader; } @@ -185,7 +178,6 @@ public static class Builder { private int[][] valueProjection; private RowType projectedKeyType; private RowType projectedValueType; - private @Nullable Function> deletionVectorSupplier; private Builder( FileIO fileIO, @@ -238,12 +230,6 @@ public Builder withValueProjection(int[][] projection) { return this; } - public Builder withDeletionVectorSupplier( - Function> deletionVectorSupplier) { - this.deletionVectorSupplier = deletionVectorSupplier; - return this; - } - public RowType keyType() { return keyType; } @@ -252,13 +238,15 @@ public RowType projectedValueType() { return projectedValueType; } - public KeyValueFileReaderFactory build(BinaryRow partition, int bucket) { - return build(partition, bucket, true, Collections.emptyList()); + public KeyValueFileReaderFactory build( + BinaryRow partition, int bucket, DeletionVector.Factory dvFactory) { + return build(partition, bucket, dvFactory, true, Collections.emptyList()); } public KeyValueFileReaderFactory build( BinaryRow partition, int bucket, + DeletionVector.Factory dvFactory, boolean projectKeys, @Nullable List filters) { int[][] keyProjection = projectKeys ? this.keyProjection : fullKeyProjection; @@ -275,7 +263,7 @@ public KeyValueFileReaderFactory build( pathFactory.createDataFilePathFactory(partition, bucket), options.fileReaderAsyncThreshold().getBytes(), partition, - deletionVectorSupplier); + dvFactory); } private void applyProjection() { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index f1e7a80a2975..a166a8526aa4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -23,7 +23,6 @@ import org.apache.paimon.KeyValueFileStore; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; @@ -247,9 +246,11 @@ private RecordReader mergeRead( // Sections are read by SortMergeReader, which sorts and merges records by keys. // So we cannot project keys or else the sorting will be incorrect. KeyValueFileReaderFactory overlappedSectionFactory = - readerFactoryBuilder.build(partition, bucket, false, filtersForKeys); + readerFactoryBuilder.build( + partition, bucket, DeletionVector.emptyFactory(), false, filtersForKeys); KeyValueFileReaderFactory nonOverlappedSectionFactory = - readerFactoryBuilder.build(partition, bucket, false, filtersForAll); + readerFactoryBuilder.build( + partition, bucket, DeletionVector.emptyFactory(), false, filtersForAll); List> sectionReaders = new ArrayList<>(); MergeFunctionWrapper mergeFuncWrapper = @@ -284,27 +285,23 @@ private RecordReader noMergeRead( @Nullable List deletionFiles, boolean onlyFilterKey) throws IOException { + DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO, files, deletionFiles); KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build( - partition, bucket, true, onlyFilterKey ? filtersForKeys : filtersForAll); + partition, + bucket, + dvFactory, + true, + onlyFilterKey ? filtersForKeys : filtersForAll); List> suppliers = new ArrayList<>(); - for (int i = 0; i < files.size(); i++) { - DataFileMeta file = files.get(i); - DeletionFile deletionFile = deletionFiles == null ? null : deletionFiles.get(i); + for (DataFileMeta file : files) { suppliers.add( () -> { // We need to check extraFiles to be compatible with Paimon 0.2. // See comments on DataFileMeta#extraFiles. String fileName = changelogFile(file).orElse(file.fileName()); - RecordReader reader = - readerFactory.createRecordReader( - file.schemaId(), fileName, file.fileSize(), file.level()); - if (deletionFile != null) { - DeletionVector deletionVector = - DeletionVector.read(fileIO, deletionFile); - reader = ApplyDeletionVectorReader.create(reader, deletionVector); - } - return reader; + return readerFactory.createRecordReader( + file.schemaId(), fileName, file.fileSize(), file.level()); }); } return ConcatRecordReader.create(suppliers); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 441f8f841003..41b77d612885 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -29,6 +29,7 @@ import org.apache.paimon.compact.NoopCompactManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; @@ -255,10 +256,9 @@ private MergeTreeCompactRewriter createRewriter( @Nullable DeletionVectorsMaintainer dvMaintainer) { KeyValueFileReaderFactory.Builder readerFactoryBuilder = this.readerFactoryBuilder.copyWithoutProjection(); - if (dvMaintainer != null) { - readerFactoryBuilder.withDeletionVectorSupplier(dvMaintainer::deletionVectorOf); - } - KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket); + DeletionVector.Factory dvFactory = DeletionVector.factory(dvMaintainer); + KeyValueFileReaderFactory readerFactory = + readerFactoryBuilder.build(partition, bucket, dvFactory); KeyValueFileWriterFactory writerFactory = writerFactoryBuilder.build(partition, bucket, options); MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, ioManager); @@ -290,7 +290,7 @@ private MergeTreeCompactRewriter createRewriter( lookupReaderFactory = readerFactoryBuilder .withValueProjection(new int[0][]) - .build(partition, bucket); + .build(partition, bucket, dvFactory); processor = new ContainsValueProcessor(); wrapperFactory = new FirstRowMergeFunctionWrapperFactory(); } else { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index e10184fefc43..99ad909ff751 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.disk.IOManager; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; @@ -123,7 +124,9 @@ public void refreshFiles( private void newLookupLevels(BinaryRow partition, int bucket, List dataFiles) { Levels levels = new Levels(keyComparatorSupplier.get(), dataFiles, options.numLevels()); - KeyValueFileReaderFactory factory = readerFactoryBuilder.build(partition, bucket); + // TODO pass DeletionVector factory + KeyValueFileReaderFactory factory = + readerFactoryBuilder.build(partition, bucket, DeletionVector.emptyFactory()); Options options = this.options.toConfiguration(); LookupLevels lookupLevels = new LookupLevels<>( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java index 024525c208ce..17df85832de0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java @@ -67,6 +67,7 @@ public Result scan(SnapshotReader reader) { int bucket = entry.getKey().getRight(); for (List files : reader.splitGenerator().splitForBatch(entry.getValue())) { + // TODO pass deletion files result.add( DataSplit.builder() .withSnapshot(endingSnapshotId) diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index df6110ce645c..05f260097a4d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FieldStats; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileIO; @@ -294,7 +295,7 @@ private KeyValueFileReaderFactory createReaderFactory( if (valueProjection != null) { builder.withValueProjection(valueProjection); } - return builder.build(BinaryRow.EMPTY_ROW, 0); + return builder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory()); } private void assertData( diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 631a6e47f9dd..58d9dbe904d9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; @@ -251,7 +252,7 @@ public List valueFields(TableSchema schema) { } }, new CoreOptions(new HashMap<>())); - return builder.build(BinaryRow.EMPTY_ROW, 0); + return builder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory()); } private SchemaManager createSchemaManager(Path path) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index e7df8fa8da65..00d8eeb5a8e4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; @@ -331,7 +332,7 @@ public List valueFields(TableSchema schema) { } }, new CoreOptions(new HashMap<>())); - return builder.build(BinaryRow.EMPTY_ROW, 0); + return builder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory()); } private SchemaManager createSchemaManager(Path path) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 8e8f315c95d9..0e28ce970b3d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileStatus; @@ -170,8 +171,10 @@ public List valueFields(TableSchema schema) { } }, new CoreOptions(new HashMap<>())); - readerFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0); - compactReaderFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0); + readerFactory = + readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory()); + compactReaderFactory = + readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory()); Map pathFactoryMap = new HashMap<>(); pathFactoryMap.put(identifier, pathFactory);