Skip to content

Commit

Permalink
[core] Introduce deletion files to DataSplit
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Mar 11, 2024
1 parent ba8b8c1 commit 08b2114
Show file tree
Hide file tree
Showing 20 changed files with 485 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ protected IndexManifestFile.Factory indexManifestFileFactory() {
public IndexFileHandler newIndexFileHandler() {
return new IndexFileHandler(
snapshotManager(),
pathFactory().indexFileFactory(),
indexManifestFileFactory().create(),
new HashIndexFile(fileIO, pathFactory().indexFileFactory()),
new DeletionVectorsIndexFile(fileIO, pathFactory().indexFileFactory()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ public KeyValueFileStoreRead newRead() {
newKeyComparator(),
userDefinedSeqComparator(),
mfFactory,
newReaderFactoryBuilder(),
options,
newIndexFileHandler());
newReaderFactoryBuilder());
}

public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Optional;

import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand All @@ -39,6 +40,18 @@ public ApplyDeletionVectorReader(RecordReader<T> reader, DeletionVector deletion
this.deletionVector = deletionVector;
}

public static <T> RecordReader<T> create(RecordReader<T> reader, Optional<DeletionVector> dv) {
return create(reader, dv.orElse(null));
}

public static <T> RecordReader<T> create(RecordReader<T> reader, @Nullable DeletionVector dv) {
if (dv == null) {
return reader;
}

return new ApplyDeletionVectorReader<>(reader, dv);
}

@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.paimon.deletionvectors;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.table.source.DeletionFile;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -91,4 +96,26 @@ static DeletionVector deserializeFromBytes(byte[] bytes) {
throw new RuntimeException("Unable to deserialize deletion vector", e);
}
}

static DeletionVector read(FileIO fileIO, DeletionFile deletionFile) throws IOException {
Path path = new Path(deletionFile.path());
try (SeekableInputStream input = fileIO.newInputStream(path)) {
input.seek(deletionFile.offset());
DataInputStream dis = new DataInputStream(input);
int actualLength = dis.readInt();
if (actualLength != deletionFile.length()) {
throw new RuntimeException(
"Size not match, actual size: "
+ actualLength
+ ", expert size: "
+ deletionFile.length());
}
int magicNum = dis.readInt();
if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) {
return BitmapDeletionVector.deserializeFromDataInput(dis);
} else {
throw new RuntimeException("Invalid magic number: " + magicNum);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;

/** Maintainer of deletionVectors index. */
public class DeletionVectorsMaintainer {

Expand All @@ -48,11 +50,7 @@ private DeletionVectorsMaintainer(
snapshotId == null
? null
: fileHandler
.scan(
snapshotId,
DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
partition,
bucket)
.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket)
.orElse(null);
this.deletionVectors =
indexFile == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public HashIndexFile(FileIO fileIO, PathFactory pathFactory) {
super(fileIO, pathFactory);
}

public Path path(String fileName) {
return pathFactory.toPath(fileName);
}

public IntIterator read(String fileName) throws IOException {
return readInts(fileIO, pathFactory.toPath(fileName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.utils.IntIterator;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;

import java.io.IOException;
Expand All @@ -43,16 +45,19 @@
public class IndexFileHandler {

private final SnapshotManager snapshotManager;
private final PathFactory pathFactory;
private final IndexManifestFile indexManifestFile;
private final HashIndexFile hashIndex;
private final DeletionVectorsIndexFile deletionVectorsIndex;

public IndexFileHandler(
SnapshotManager snapshotManager,
PathFactory pathFactory,
IndexManifestFile indexManifestFile,
HashIndexFile hashIndex,
DeletionVectorsIndexFile deletionVectorsIndex) {
this.snapshotManager = snapshotManager;
this.pathFactory = pathFactory;
this.indexManifestFile = indexManifestFile;
this.hashIndex = hashIndex;
this.deletionVectorsIndex = deletionVectorsIndex;
Expand Down Expand Up @@ -102,6 +107,10 @@ public List<IndexManifestEntry> scan(long snapshotId, String indexType, BinaryRo
return result;
}

public Path filePath(IndexFileMeta file) {
return pathFactory.toPath(file.fileName());
}

public List<Integer> readHashIndexList(IndexFileMeta file) {
return IntIterator.toIntList(readHashIndex(file));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,5 +282,9 @@ private void applyProjection() {
projectedKeyType = Projection.of(keyProjection).project(keyType);
projectedValueType = Projection.of(valueProjection).project(valueType);
}

public FileIO fileIO() {
return fileIO;
}
}
}
Loading

0 comments on commit 08b2114

Please sign in to comment.