Skip to content

Commit

Permalink
unify deletion vector apply
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Mar 12, 2024
1 parent 08b2114 commit 339ef74
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.DeletionFile;

import javax.annotation.Nullable;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* The DeletionVector can efficiently record the positions of rows that are deleted in a file, which
Expand Down Expand Up @@ -118,4 +125,42 @@ static DeletionVector read(FileIO fileIO, DeletionFile deletionFile) throws IOEx
}
}
}

static Factory emptyFactory() {
return fileName -> Optional.empty();
}

static Factory factory(@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (dvMaintainer == null) {
return emptyFactory();
}
return dvMaintainer::deletionVectorOf;
}

static Factory factory(
FileIO fileIO, List<DataFileMeta> files, @Nullable List<DeletionFile> deletionFiles) {
if (deletionFiles == null) {
return emptyFactory();
}
Map<String, DeletionFile> fileToDeletion = new HashMap<>();
for (int i = 0; i < files.size(); i++) {
DeletionFile deletionFile = deletionFiles.get(i);
if (deletionFile != null) {
fileToDeletion.put(files.get(i).fileName(), deletionFile);
}
}
return fileName -> {
DeletionFile deletionFile = fileToDeletion.get(fileName);
if (deletionFile == null) {
return Optional.empty();
}

return Optional.of(DeletionVector.read(fileIO, deletionFile));
};
}

/** Interface to create {@link DeletionVector}. */
interface Factory {
Optional<DeletionVector> create(String fileName) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

/** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */
Expand All @@ -63,9 +62,7 @@ public class KeyValueFileReaderFactory {

private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;

// FileName to its corresponding deletion vector
private final @Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier;
private final DeletionVector.Factory dvFactory;

private KeyValueFileReaderFactory(
FileIO fileIO,
Expand All @@ -77,7 +74,7 @@ private KeyValueFileReaderFactory(
DataFilePathFactory pathFactory,
long asyncThreshold,
BinaryRow partition,
@Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier) {
DeletionVector.Factory dvFactory) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -88,7 +85,7 @@ private KeyValueFileReaderFactory(
this.asyncThreshold = asyncThreshold;
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.deletionVectorSupplier = deletionVectorSupplier;
this.dvFactory = dvFactory;
}

public RecordReader<KeyValue> createRecordReader(
Expand Down Expand Up @@ -134,13 +131,9 @@ private RecordReader<KeyValue> createRecordReader(
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
if (deletionVectorSupplier != null) {
Optional<DeletionVector> optionalDeletionVector =
deletionVectorSupplier.apply(fileName);
if (optionalDeletionVector.isPresent() && !optionalDeletionVector.get().isEmpty()) {
recordReader =
new ApplyDeletionVectorReader<>(recordReader, optionalDeletionVector.get());
}
Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
recordReader = new ApplyDeletionVectorReader<>(recordReader, deletionVector.get());
}
return recordReader;
}
Expand Down Expand Up @@ -185,7 +178,6 @@ public static class Builder {
private int[][] valueProjection;
private RowType projectedKeyType;
private RowType projectedValueType;
private @Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier;

private Builder(
FileIO fileIO,
Expand Down Expand Up @@ -238,12 +230,6 @@ public Builder withValueProjection(int[][] projection) {
return this;
}

public Builder withDeletionVectorSupplier(
Function<String, Optional<DeletionVector>> deletionVectorSupplier) {
this.deletionVectorSupplier = deletionVectorSupplier;
return this;
}

public RowType keyType() {
return keyType;
}
Expand All @@ -252,13 +238,15 @@ public RowType projectedValueType() {
return projectedValueType;
}

public KeyValueFileReaderFactory build(BinaryRow partition, int bucket) {
return build(partition, bucket, true, Collections.emptyList());
public KeyValueFileReaderFactory build(
BinaryRow partition, int bucket, DeletionVector.Factory dvFactory) {
return build(partition, bucket, dvFactory, true, Collections.emptyList());
}

public KeyValueFileReaderFactory build(
BinaryRow partition,
int bucket,
DeletionVector.Factory dvFactory,
boolean projectKeys,
@Nullable List<Predicate> filters) {
int[][] keyProjection = projectKeys ? this.keyProjection : fullKeyProjection;
Expand All @@ -275,7 +263,7 @@ public KeyValueFileReaderFactory build(
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes(),
partition,
deletionVectorSupplier);
dvFactory);
}

private void applyProjection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
Expand Down Expand Up @@ -247,9 +246,11 @@ private RecordReader<KeyValue> mergeRead(
// Sections are read by SortMergeReader, which sorts and merges records by keys.
// So we cannot project keys or else the sorting will be incorrect.
KeyValueFileReaderFactory overlappedSectionFactory =
readerFactoryBuilder.build(partition, bucket, false, filtersForKeys);
readerFactoryBuilder.build(
partition, bucket, DeletionVector.emptyFactory(), false, filtersForKeys);
KeyValueFileReaderFactory nonOverlappedSectionFactory =
readerFactoryBuilder.build(partition, bucket, false, filtersForAll);
readerFactoryBuilder.build(
partition, bucket, DeletionVector.emptyFactory(), false, filtersForAll);

List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
Expand Down Expand Up @@ -284,27 +285,23 @@ private RecordReader<KeyValue> noMergeRead(
@Nullable List<DeletionFile> deletionFiles,
boolean onlyFilterKey)
throws IOException {
DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO, files, deletionFiles);
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(
partition, bucket, true, onlyFilterKey ? filtersForKeys : filtersForAll);
partition,
bucket,
dvFactory,
true,
onlyFilterKey ? filtersForKeys : filtersForAll);
List<ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
for (int i = 0; i < files.size(); i++) {
DataFileMeta file = files.get(i);
DeletionFile deletionFile = deletionFiles == null ? null : deletionFiles.get(i);
for (DataFileMeta file : files) {
suppliers.add(
() -> {
// We need to check extraFiles to be compatible with Paimon 0.2.
// See comments on DataFileMeta#extraFiles.
String fileName = changelogFile(file).orElse(file.fileName());
RecordReader<KeyValue> reader =
readerFactory.createRecordReader(
file.schemaId(), fileName, file.fileSize(), file.level());
if (deletionFile != null) {
DeletionVector deletionVector =
DeletionVector.read(fileIO, deletionFile);
reader = ApplyDeletionVectorReader.create(reader, deletionVector);
}
return reader;
return readerFactory.createRecordReader(
file.schemaId(), fileName, file.fileSize(), file.level());
});
}
return ConcatRecordReader.create(suppliers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.compact.NoopCompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
Expand Down Expand Up @@ -255,10 +256,9 @@ private MergeTreeCompactRewriter createRewriter(
@Nullable DeletionVectorsMaintainer dvMaintainer) {
KeyValueFileReaderFactory.Builder readerFactoryBuilder =
this.readerFactoryBuilder.copyWithoutProjection();
if (dvMaintainer != null) {
readerFactoryBuilder.withDeletionVectorSupplier(dvMaintainer::deletionVectorOf);
}
KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket);
DeletionVector.Factory dvFactory = DeletionVector.factory(dvMaintainer);
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(partition, bucket, dvFactory);
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options);
MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, ioManager);
Expand Down Expand Up @@ -290,7 +290,7 @@ private MergeTreeCompactRewriter createRewriter(
lookupReaderFactory =
readerFactoryBuilder
.withValueProjection(new int[0][])
.build(partition, bucket);
.build(partition, bucket, dvFactory);
processor = new ContainsValueProcessor();
wrapperFactory = new FirstRowMergeFunctionWrapperFactory();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
Expand Down Expand Up @@ -123,7 +124,9 @@ public void refreshFiles(

private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta> dataFiles) {
Levels levels = new Levels(keyComparatorSupplier.get(), dataFiles, options.numLevels());
KeyValueFileReaderFactory factory = readerFactoryBuilder.build(partition, bucket);
// TODO pass DeletionVector factory
KeyValueFileReaderFactory factory =
readerFactoryBuilder.build(partition, bucket, DeletionVector.emptyFactory());
Options options = this.options.toConfiguration();
LookupLevels<KeyValue> lookupLevels =
new LookupLevels<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public Result scan(SnapshotReader reader) {
int bucket = entry.getKey().getRight();
for (List<DataFileMeta> files :
reader.splitGenerator().splitForBatch(entry.getValue())) {
// TODO pass deletion files
result.add(
DataSplit.builder()
.withSnapshot(endingSnapshotId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FieldStats;
import org.apache.paimon.format.FlushingFileFormat;
import org.apache.paimon.fs.FileIO;
Expand Down Expand Up @@ -294,7 +295,7 @@ private KeyValueFileReaderFactory createReaderFactory(
if (valueProjection != null) {
builder.withValueProjection(valueProjection);
}
return builder.build(BinaryRow.EMPTY_ROW, 0);
return builder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory());
}

private void assertData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FlushingFileFormat;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -251,7 +252,7 @@ public List<DataField> valueFields(TableSchema schema) {
}
},
new CoreOptions(new HashMap<>()));
return builder.build(BinaryRow.EMPTY_ROW, 0);
return builder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory());
}

private SchemaManager createSchemaManager(Path path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FlushingFileFormat;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -331,7 +332,7 @@ public List<DataField> valueFields(TableSchema schema) {
}
},
new CoreOptions(new HashMap<>()));
return builder.build(BinaryRow.EMPTY_ROW, 0);
return builder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory());
}

private SchemaManager createSchemaManager(Path path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FlushingFileFormat;
import org.apache.paimon.fs.FileStatus;
Expand Down Expand Up @@ -170,8 +171,10 @@ public List<DataField> valueFields(TableSchema schema) {
}
},
new CoreOptions(new HashMap<>()));
readerFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0);
compactReaderFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0);
readerFactory =
readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory());
compactReaderFactory =
readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory());

Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
pathFactoryMap.put(identifier, pathFactory);
Expand Down

0 comments on commit 339ef74

Please sign in to comment.