diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index 6842a50145ef..f58f2d0ef3e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -457,32 +457,7 @@ protected void validateCompactionResult( TsFileValidator validator = TsFileValidator.getInstance(); if (needToValidatePartitionSeqSpaceOverlap) { - List timePartitionSeqFiles = - new ArrayList<>(tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)); - timePartitionSeqFiles.removeAll(sourceSeqFiles); - timePartitionSeqFiles.addAll(validTargetFiles); - timePartitionSeqFiles.sort( - (f1, f2) -> { - int timeDiff = - Long.compareUnsigned( - Long.parseLong(f1.getTsFile().getName().split("-")[0]), - Long.parseLong(f2.getTsFile().getName().split("-")[0])); - return timeDiff == 0 - ? Long.compareUnsigned( - Long.parseLong(f1.getTsFile().getName().split("-")[1]), - Long.parseLong(f2.getTsFile().getName().split("-")[1])) - : timeDiff; - }); - List overlapFilesInTimePartition = - RepairDataFileScanUtil.checkTimePartitionHasOverlap(timePartitionSeqFiles, true); - if (!overlapFilesInTimePartition.isEmpty()) { - LOGGER.error( - "Failed to pass compaction validation, source seq files: {}, source unseq files: {}, target files: {}", - sourceSeqFiles, - sourceUnseqFiles, - targetFiles); - throw new CompactionValidationFailedException(overlapFilesInTimePartition); - } + checkSequenceSpaceOverlap(sourceSeqFiles, sourceUnseqFiles, targetFiles, validTargetFiles); } if (needToValidateTsFileCorrectness && !validator.validateTsFiles(validTargetFiles)) { LOGGER.error( @@ -495,6 +470,87 @@ protected void validateCompactionResult( } } + protected void checkSequenceSpaceOverlap( + List sourceSeqFiles, + List sourceUnseqFiles, + List targetFiles, + List validTargetFiles) { + List timePartitionSeqFiles = + new ArrayList<>(tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)); + timePartitionSeqFiles.removeAll(sourceSeqFiles); + timePartitionSeqFiles.addAll(validTargetFiles); + timePartitionSeqFiles.sort( + (f1, f2) -> { + int timeDiff = + Long.compareUnsigned( + Long.parseLong(f1.getTsFile().getName().split("-")[0]), + Long.parseLong(f2.getTsFile().getName().split("-")[0])); + return timeDiff == 0 + ? Long.compareUnsigned( + Long.parseLong(f1.getTsFile().getName().split("-")[1]), + Long.parseLong(f2.getTsFile().getName().split("-")[1])) + : timeDiff; + }); + if (this instanceof InnerSpaceCompactionTask + || this instanceof InsertionCrossSpaceCompactionTask) { + timePartitionSeqFiles = + filterResourcesByFileTimeIndexInOverlapValidation( + timePartitionSeqFiles, validTargetFiles); + } + List overlapFilesInTimePartition = + RepairDataFileScanUtil.checkTimePartitionHasOverlap(timePartitionSeqFiles, true); + if (!overlapFilesInTimePartition.isEmpty()) { + LOGGER.error( + "Failed to pass compaction overlap validation, source seq files: {}, source unseq files: {}, target files: {}", + sourceSeqFiles, + sourceUnseqFiles, + targetFiles); + for (TsFileResource resource : overlapFilesInTimePartition) { + if (resource.getTsFileRepairStatus() != TsFileRepairStatus.CAN_NOT_REPAIR) { + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK); + } + } + } + } + + private List filterResourcesByFileTimeIndexInOverlapValidation( + List timePartitionSeqFiles, List targetFiles) { + if (targetFiles.isEmpty()) { + return timePartitionSeqFiles; + } + TsFileResource firstTargetResource = targetFiles.get(0); + TsFileResource lastTargetResource = targetFiles.get(targetFiles.size() - 1); + long minStartTimeInTargetFiles = + targetFiles.stream().mapToLong(TsFileResource::getFileStartTime).min().getAsLong(); + long maxEndTimeInTargetFiles = + targetFiles.stream().mapToLong(TsFileResource::getFileEndTime).max().getAsLong(); + List result = new ArrayList<>(timePartitionSeqFiles.size()); + int idx; + for (idx = 0; idx < timePartitionSeqFiles.size(); idx++) { + TsFileResource resource = timePartitionSeqFiles.get(idx); + if (resource == firstTargetResource) { + break; + } + if (resource.getFileEndTime() >= minStartTimeInTargetFiles) { + result.add(resource); + } + } + for (; idx < timePartitionSeqFiles.size(); idx++) { + TsFileResource resource = timePartitionSeqFiles.get(idx); + result.add(resource); + if (resource == lastTargetResource) { + break; + } + } + for (idx += 1; idx < timePartitionSeqFiles.size(); idx++) { + TsFileResource resource = timePartitionSeqFiles.get(idx); + if (resource.getFileStartTime() <= maxEndTimeInTargetFiles) { + result.add(resource); + } + } + return result; + } + public abstract CompactionTaskType getCompactionTaskType(); @TestOnly diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java index fb93be97d030..8cd922a07c74 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java @@ -49,7 +49,7 @@ public class InsertionCrossSpaceCompactionTask extends AbstractCompactionTask { private Phaser phaser; - private boolean failToPassOverlapValidation = false; + private boolean failToPassValidation = false; public InsertionCrossSpaceCompactionTask( Phaser phaser, @@ -178,7 +178,7 @@ protected boolean doCompaction() { String.format("%.2f", costTime)); } catch (Exception e) { if (e instanceof CompactionValidationFailedException) { - failToPassOverlapValidation = true; + failToPassValidation = true; } isSuccess = false; handleException(LOGGER, e); @@ -295,7 +295,7 @@ private boolean shouldRollback() { || (unseqFileToInsert != null && unseqFileToInsert.modFileExists() && !targetFile.modFileExists()) - || failToPassOverlapValidation; + || failToPassValidation; } private void rollback() throws IOException { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java index efab3d89d749..0193c616f1af 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java @@ -51,6 +51,8 @@ import org.junit.Test; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Phaser; @@ -499,10 +501,9 @@ public void testInsertionCompactionUpdateFileMetrics() throws IOException { Assert.assertEquals( unseqFileNumBeforeCompaction - 1, FileMetrics.getInstance().getFileCount(false)); - // overlap TsFileResource unseqResource2 = generateSingleNonAlignedSeriesFileWithDevices( - "3-3-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new TimeRange(1, 4)}, false); + "3-3-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new TimeRange(5, 6)}, false); FileMetrics.getInstance() .addTsFile( unseqResource2.getDatabaseName(), @@ -517,6 +518,9 @@ public void testInsertionCompactionUpdateFileMetrics() throws IOException { taskResource = new InsertionCrossCompactionTaskResource(); taskResource.setToInsertUnSeqFile(unseqResource2); task = new InsertionCrossSpaceCompactionTask(null, 0, tsFileManager, taskResource, 0); + // .resource file not found + Files.deleteIfExists( + Paths.get(unseqResource2.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX)); // rollback Assert.assertFalse(task.start()); Assert.assertEquals(seqFileNumBeforeCompaction, FileMetrics.getInstance().getFileCount(true));