Skip to content

Commit

Permalink
Reduce deserialized resource file num in sequence space compaction ta…
Browse files Browse the repository at this point in the history
…sk validation (#13900)

* reduce deserialized resource file num in sequence space compaction task validation

* add comment

* filter resources before validate overlap & don't rollback when target files have overlap data

* fix bug

* fix ut
  • Loading branch information
shuwenwei authored Oct 29, 2024
1 parent 5726de0 commit b280097
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -457,32 +457,7 @@ protected void validateCompactionResult(

TsFileValidator validator = TsFileValidator.getInstance();
if (needToValidatePartitionSeqSpaceOverlap) {
List<TsFileResource> timePartitionSeqFiles =
new ArrayList<>(tsFileManager.getOrCreateSequenceListByTimePartition(timePartition));
timePartitionSeqFiles.removeAll(sourceSeqFiles);
timePartitionSeqFiles.addAll(validTargetFiles);
timePartitionSeqFiles.sort(
(f1, f2) -> {
int timeDiff =
Long.compareUnsigned(
Long.parseLong(f1.getTsFile().getName().split("-")[0]),
Long.parseLong(f2.getTsFile().getName().split("-")[0]));
return timeDiff == 0
? Long.compareUnsigned(
Long.parseLong(f1.getTsFile().getName().split("-")[1]),
Long.parseLong(f2.getTsFile().getName().split("-")[1]))
: timeDiff;
});
List<TsFileResource> overlapFilesInTimePartition =
RepairDataFileScanUtil.checkTimePartitionHasOverlap(timePartitionSeqFiles, true);
if (!overlapFilesInTimePartition.isEmpty()) {
LOGGER.error(
"Failed to pass compaction validation, source seq files: {}, source unseq files: {}, target files: {}",
sourceSeqFiles,
sourceUnseqFiles,
targetFiles);
throw new CompactionValidationFailedException(overlapFilesInTimePartition);
}
checkSequenceSpaceOverlap(sourceSeqFiles, sourceUnseqFiles, targetFiles, validTargetFiles);
}
if (needToValidateTsFileCorrectness && !validator.validateTsFiles(validTargetFiles)) {
LOGGER.error(
Expand All @@ -495,6 +470,87 @@ protected void validateCompactionResult(
}
}

protected void checkSequenceSpaceOverlap(
List<TsFileResource> sourceSeqFiles,
List<TsFileResource> sourceUnseqFiles,
List<TsFileResource> targetFiles,
List<TsFileResource> validTargetFiles) {
List<TsFileResource> timePartitionSeqFiles =
new ArrayList<>(tsFileManager.getOrCreateSequenceListByTimePartition(timePartition));
timePartitionSeqFiles.removeAll(sourceSeqFiles);
timePartitionSeqFiles.addAll(validTargetFiles);
timePartitionSeqFiles.sort(
(f1, f2) -> {
int timeDiff =
Long.compareUnsigned(
Long.parseLong(f1.getTsFile().getName().split("-")[0]),
Long.parseLong(f2.getTsFile().getName().split("-")[0]));
return timeDiff == 0
? Long.compareUnsigned(
Long.parseLong(f1.getTsFile().getName().split("-")[1]),
Long.parseLong(f2.getTsFile().getName().split("-")[1]))
: timeDiff;
});
if (this instanceof InnerSpaceCompactionTask
|| this instanceof InsertionCrossSpaceCompactionTask) {
timePartitionSeqFiles =
filterResourcesByFileTimeIndexInOverlapValidation(
timePartitionSeqFiles, validTargetFiles);
}
List<TsFileResource> overlapFilesInTimePartition =
RepairDataFileScanUtil.checkTimePartitionHasOverlap(timePartitionSeqFiles, true);
if (!overlapFilesInTimePartition.isEmpty()) {
LOGGER.error(
"Failed to pass compaction overlap validation, source seq files: {}, source unseq files: {}, target files: {}",
sourceSeqFiles,
sourceUnseqFiles,
targetFiles);
for (TsFileResource resource : overlapFilesInTimePartition) {
if (resource.getTsFileRepairStatus() != TsFileRepairStatus.CAN_NOT_REPAIR) {
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK);
}
}
}
}

private List<TsFileResource> filterResourcesByFileTimeIndexInOverlapValidation(
List<TsFileResource> timePartitionSeqFiles, List<TsFileResource> targetFiles) {
if (targetFiles.isEmpty()) {
return timePartitionSeqFiles;
}
TsFileResource firstTargetResource = targetFiles.get(0);
TsFileResource lastTargetResource = targetFiles.get(targetFiles.size() - 1);
long minStartTimeInTargetFiles =
targetFiles.stream().mapToLong(TsFileResource::getFileStartTime).min().getAsLong();
long maxEndTimeInTargetFiles =
targetFiles.stream().mapToLong(TsFileResource::getFileEndTime).max().getAsLong();
List<TsFileResource> result = new ArrayList<>(timePartitionSeqFiles.size());
int idx;
for (idx = 0; idx < timePartitionSeqFiles.size(); idx++) {
TsFileResource resource = timePartitionSeqFiles.get(idx);
if (resource == firstTargetResource) {
break;
}
if (resource.getFileEndTime() >= minStartTimeInTargetFiles) {
result.add(resource);
}
}
for (; idx < timePartitionSeqFiles.size(); idx++) {
TsFileResource resource = timePartitionSeqFiles.get(idx);
result.add(resource);
if (resource == lastTargetResource) {
break;
}
}
for (idx += 1; idx < timePartitionSeqFiles.size(); idx++) {
TsFileResource resource = timePartitionSeqFiles.get(idx);
if (resource.getFileStartTime() <= maxEndTimeInTargetFiles) {
result.add(resource);
}
}
return result;
}

public abstract CompactionTaskType getCompactionTaskType();

@TestOnly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public class InsertionCrossSpaceCompactionTask extends AbstractCompactionTask {

private Phaser phaser;
private boolean failToPassOverlapValidation = false;
private boolean failToPassValidation = false;

public InsertionCrossSpaceCompactionTask(
Phaser phaser,
Expand Down Expand Up @@ -178,7 +178,7 @@ protected boolean doCompaction() {
String.format("%.2f", costTime));
} catch (Exception e) {
if (e instanceof CompactionValidationFailedException) {
failToPassOverlapValidation = true;
failToPassValidation = true;
}
isSuccess = false;
handleException(LOGGER, e);
Expand Down Expand Up @@ -295,7 +295,7 @@ private boolean shouldRollback() {
|| (unseqFileToInsert != null
&& unseqFileToInsert.modFileExists()
&& !targetFile.modFileExists())
|| failToPassOverlapValidation;
|| failToPassValidation;
}

private void rollback() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.junit.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;
Expand Down Expand Up @@ -499,10 +501,9 @@ public void testInsertionCompactionUpdateFileMetrics() throws IOException {
Assert.assertEquals(
unseqFileNumBeforeCompaction - 1, FileMetrics.getInstance().getFileCount(false));

// overlap
TsFileResource unseqResource2 =
generateSingleNonAlignedSeriesFileWithDevices(
"3-3-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new TimeRange(1, 4)}, false);
"3-3-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new TimeRange(5, 6)}, false);
FileMetrics.getInstance()
.addTsFile(
unseqResource2.getDatabaseName(),
Expand All @@ -517,6 +518,9 @@ public void testInsertionCompactionUpdateFileMetrics() throws IOException {
taskResource = new InsertionCrossCompactionTaskResource();
taskResource.setToInsertUnSeqFile(unseqResource2);
task = new InsertionCrossSpaceCompactionTask(null, 0, tsFileManager, taskResource, 0);
// .resource file not found
Files.deleteIfExists(
Paths.get(unseqResource2.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX));
// rollback
Assert.assertFalse(task.start());
Assert.assertEquals(seqFileNumBeforeCompaction, FileMetrics.getInstance().getFileCount(true));
Expand Down

0 comments on commit b280097

Please sign in to comment.