From 12321ff202b3b43b85b2d27a831e61bc3915ea9a Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 24 Oct 2024 15:55:11 +0800 Subject: [PATCH 1/5] reduce deserialized resource file num in sequence space compaction task validation --- .../execute/task/AbstractCompactionTask.java | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index 6842a50145ef..bfb50ff43b2e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -457,24 +457,30 @@ protected void validateCompactionResult( TsFileValidator validator = TsFileValidator.getInstance(); if (needToValidatePartitionSeqSpaceOverlap) { - List timePartitionSeqFiles = - new ArrayList<>(tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)); - timePartitionSeqFiles.removeAll(sourceSeqFiles); - timePartitionSeqFiles.addAll(validTargetFiles); - timePartitionSeqFiles.sort( - (f1, f2) -> { - int timeDiff = - Long.compareUnsigned( - Long.parseLong(f1.getTsFile().getName().split("-")[0]), - Long.parseLong(f2.getTsFile().getName().split("-")[0])); - return timeDiff == 0 - ? Long.compareUnsigned( - Long.parseLong(f1.getTsFile().getName().split("-")[1]), - Long.parseLong(f2.getTsFile().getName().split("-")[1])) - : timeDiff; - }); - List overlapFilesInTimePartition = - RepairDataFileScanUtil.checkTimePartitionHasOverlap(timePartitionSeqFiles, true); + List overlapFilesInTimePartition; + if (this instanceof InnerSpaceCompactionTask) { + overlapFilesInTimePartition = + RepairDataFileScanUtil.checkTimePartitionHasOverlap(targetFiles, true); + } else { + List timePartitionSeqFiles = + new ArrayList<>(tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)); + timePartitionSeqFiles.removeAll(sourceSeqFiles); + timePartitionSeqFiles.addAll(validTargetFiles); + timePartitionSeqFiles.sort( + (f1, f2) -> { + int timeDiff = + Long.compareUnsigned( + Long.parseLong(f1.getTsFile().getName().split("-")[0]), + Long.parseLong(f2.getTsFile().getName().split("-")[0])); + return timeDiff == 0 + ? Long.compareUnsigned( + Long.parseLong(f1.getTsFile().getName().split("-")[1]), + Long.parseLong(f2.getTsFile().getName().split("-")[1])) + : timeDiff; + }); + overlapFilesInTimePartition = + RepairDataFileScanUtil.checkTimePartitionHasOverlap(timePartitionSeqFiles, true); + } if (!overlapFilesInTimePartition.isEmpty()) { LOGGER.error( "Failed to pass compaction validation, source seq files: {}, source unseq files: {}, target files: {}", From b71085dd329cefe462d2cb146a1a793c704c7148 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 24 Oct 2024 16:06:36 +0800 Subject: [PATCH 2/5] add comment --- .../compaction/execute/task/AbstractCompactionTask.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index bfb50ff43b2e..6284c271b3e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -459,6 +459,8 @@ protected void validateCompactionResult( if (needToValidatePartitionSeqSpaceOverlap) { List overlapFilesInTimePartition; if (this instanceof InnerSpaceCompactionTask) { + // Sequence InnerSpaceCompactionTask doesn't introduce new data into sequence space. + // Therefore, there is no scanning of resources for the entire time partition here. overlapFilesInTimePartition = RepairDataFileScanUtil.checkTimePartitionHasOverlap(targetFiles, true); } else { From 288a4888abe20934c333ab82ddfa164a8e699ca6 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 28 Oct 2024 15:37:24 +0800 Subject: [PATCH 3/5] filter resources before validate overlap & don't rollback when target files have overlap data --- .../execute/task/AbstractCompactionTask.java | 116 +++++++++++++----- .../InsertionCrossSpaceCompactionTask.java | 6 +- 2 files changed, 85 insertions(+), 37 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index 6284c271b3e6..fd6a5fcbb13a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -457,40 +457,7 @@ protected void validateCompactionResult( TsFileValidator validator = TsFileValidator.getInstance(); if (needToValidatePartitionSeqSpaceOverlap) { - List overlapFilesInTimePartition; - if (this instanceof InnerSpaceCompactionTask) { - // Sequence InnerSpaceCompactionTask doesn't introduce new data into sequence space. - // Therefore, there is no scanning of resources for the entire time partition here. - overlapFilesInTimePartition = - RepairDataFileScanUtil.checkTimePartitionHasOverlap(targetFiles, true); - } else { - List timePartitionSeqFiles = - new ArrayList<>(tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)); - timePartitionSeqFiles.removeAll(sourceSeqFiles); - timePartitionSeqFiles.addAll(validTargetFiles); - timePartitionSeqFiles.sort( - (f1, f2) -> { - int timeDiff = - Long.compareUnsigned( - Long.parseLong(f1.getTsFile().getName().split("-")[0]), - Long.parseLong(f2.getTsFile().getName().split("-")[0])); - return timeDiff == 0 - ? Long.compareUnsigned( - Long.parseLong(f1.getTsFile().getName().split("-")[1]), - Long.parseLong(f2.getTsFile().getName().split("-")[1])) - : timeDiff; - }); - overlapFilesInTimePartition = - RepairDataFileScanUtil.checkTimePartitionHasOverlap(timePartitionSeqFiles, true); - } - if (!overlapFilesInTimePartition.isEmpty()) { - LOGGER.error( - "Failed to pass compaction validation, source seq files: {}, source unseq files: {}, target files: {}", - sourceSeqFiles, - sourceUnseqFiles, - targetFiles); - throw new CompactionValidationFailedException(overlapFilesInTimePartition); - } + checkSequenceSpaceOverlap(sourceSeqFiles, sourceUnseqFiles, targetFiles, validTargetFiles); } if (needToValidateTsFileCorrectness && !validator.validateTsFiles(validTargetFiles)) { LOGGER.error( @@ -503,6 +470,87 @@ protected void validateCompactionResult( } } + protected void checkSequenceSpaceOverlap( + List sourceSeqFiles, + List sourceUnseqFiles, + List targetFiles, + List validTargetFiles) { + List timePartitionSeqFiles = + new ArrayList<>(tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)); + timePartitionSeqFiles.removeAll(sourceSeqFiles); + timePartitionSeqFiles.addAll(validTargetFiles); + timePartitionSeqFiles.sort( + (f1, f2) -> { + int timeDiff = + Long.compareUnsigned( + Long.parseLong(f1.getTsFile().getName().split("-")[0]), + Long.parseLong(f2.getTsFile().getName().split("-")[0])); + return timeDiff == 0 + ? Long.compareUnsigned( + Long.parseLong(f1.getTsFile().getName().split("-")[1]), + Long.parseLong(f2.getTsFile().getName().split("-")[1])) + : timeDiff; + }); + if (this instanceof InnerSpaceCompactionTask + || this instanceof InsertionCrossSpaceCompactionTask) { + timePartitionSeqFiles = + filterResourcesByFileTimeIndexInOverlapValidation( + timePartitionSeqFiles, validTargetFiles); + } + List overlapFilesInTimePartition = + RepairDataFileScanUtil.checkTimePartitionHasOverlap(timePartitionSeqFiles, true); + if (!overlapFilesInTimePartition.isEmpty()) { + LOGGER.error( + "Failed to pass compaction overlap validation, source seq files: {}, source unseq files: {}, target files: {}", + sourceSeqFiles, + sourceUnseqFiles, + targetFiles); + for (TsFileResource resource : overlapFilesInTimePartition) { + if (resource.getTsFileRepairStatus() != TsFileRepairStatus.CAN_NOT_REPAIR) { + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK); + } + } + } + } + + private List filterResourcesByFileTimeIndexInOverlapValidation( + List timePartitionSeqFiles, List targetFiles) { + if (targetFiles.isEmpty()) { + return timePartitionSeqFiles; + } + TsFileResource firstTargetResource = targetFiles.get(0); + TsFileResource lastTargetResource = targetFiles.get(targetFiles.size() - 1); + long minStartTimeInTargetFiles = + targetFiles.stream().mapToLong(TsFileResource::getFileStartTime).min().getAsLong(); + long maxEndTimeInTargetFiles = + targetFiles.stream().mapToLong(TsFileResource::getFileEndTime).max().getAsLong(); + List result = new ArrayList<>(timePartitionSeqFiles.size()); + int idx; + for (idx = 0; idx < timePartitionSeqFiles.size(); idx++) { + TsFileResource resource = timePartitionSeqFiles.get(idx); + if (resource == firstTargetResource) { + break; + } + if (resource.getFileEndTime() >= minStartTimeInTargetFiles) { + result.add(resource); + } + } + for (; idx < timePartitionSeqFiles.size(); idx++) { + TsFileResource resource = timePartitionSeqFiles.get(idx); + result.add(resource); + if (resource == lastTargetResource) { + break; + } + } + for (; idx < timePartitionSeqFiles.size(); idx++) { + TsFileResource resource = timePartitionSeqFiles.get(idx); + if (resource.getFileStartTime() <= maxEndTimeInTargetFiles) { + result.add(resource); + } + } + return result; + } + public abstract CompactionTaskType getCompactionTaskType(); @TestOnly diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java index fb93be97d030..8cd922a07c74 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java @@ -49,7 +49,7 @@ public class InsertionCrossSpaceCompactionTask extends AbstractCompactionTask { private Phaser phaser; - private boolean failToPassOverlapValidation = false; + private boolean failToPassValidation = false; public InsertionCrossSpaceCompactionTask( Phaser phaser, @@ -178,7 +178,7 @@ protected boolean doCompaction() { String.format("%.2f", costTime)); } catch (Exception e) { if (e instanceof CompactionValidationFailedException) { - failToPassOverlapValidation = true; + failToPassValidation = true; } isSuccess = false; handleException(LOGGER, e); @@ -295,7 +295,7 @@ private boolean shouldRollback() { || (unseqFileToInsert != null && unseqFileToInsert.modFileExists() && !targetFile.modFileExists()) - || failToPassOverlapValidation; + || failToPassValidation; } private void rollback() throws IOException { From 9e80e3ed1851d4134558fcb4032edaf003437a1f Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 28 Oct 2024 17:09:30 +0800 Subject: [PATCH 4/5] fix bug --- .../compaction/execute/task/AbstractCompactionTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index fd6a5fcbb13a..f58f2d0ef3e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -542,7 +542,7 @@ private List filterResourcesByFileTimeIndexInOverlapValidation( break; } } - for (; idx < timePartitionSeqFiles.size(); idx++) { + for (idx += 1; idx < timePartitionSeqFiles.size(); idx++) { TsFileResource resource = timePartitionSeqFiles.get(idx); if (resource.getFileStartTime() <= maxEndTimeInTargetFiles) { result.add(resource); From c14ee6ac08dbd189bb1b35a6aec65ad6c60f06e4 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 28 Oct 2024 18:15:41 +0800 Subject: [PATCH 5/5] fix ut --- .../cross/InsertionCrossSpaceCompactionTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java index efab3d89d749..0193c616f1af 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java @@ -51,6 +51,8 @@ import org.junit.Test; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Phaser; @@ -499,10 +501,9 @@ public void testInsertionCompactionUpdateFileMetrics() throws IOException { Assert.assertEquals( unseqFileNumBeforeCompaction - 1, FileMetrics.getInstance().getFileCount(false)); - // overlap TsFileResource unseqResource2 = generateSingleNonAlignedSeriesFileWithDevices( - "3-3-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new TimeRange(1, 4)}, false); + "3-3-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new TimeRange(5, 6)}, false); FileMetrics.getInstance() .addTsFile( unseqResource2.getDatabaseName(), @@ -517,6 +518,9 @@ public void testInsertionCompactionUpdateFileMetrics() throws IOException { taskResource = new InsertionCrossCompactionTaskResource(); taskResource.setToInsertUnSeqFile(unseqResource2); task = new InsertionCrossSpaceCompactionTask(null, 0, tsFileManager, taskResource, 0); + // .resource file not found + Files.deleteIfExists( + Paths.get(unseqResource2.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX)); // rollback Assert.assertFalse(task.start()); Assert.assertEquals(seqFileNumBeforeCompaction, FileMetrics.getInstance().getFileCount(true));