diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java index 53e25da2b60c..847f628ef032 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus; import org.apache.tsfile.file.metadata.ChunkMetadata; @@ -35,7 +36,7 @@ import java.util.Map; public class BatchCompactionPlan { - public static final long MAX_CACHED_TIME_CHUNKS_SIZE = 2 * 1024 * 1024; + public static long maxCachedTimeChunksSize = 2 * 1024 * 1024; private final List compactChunkPlans = new ArrayList<>(); private final Map> alignedPageModifiedStatusCache = new HashMap<>(); @@ -50,13 +51,14 @@ public Chunk getTimeChunkFromCache(TsFileSequenceReader reader, ChunkMetadata ch Chunk chunk = cachedTimeChunks.get(key); if (chunk == null) { chunk = reader.readMemChunk(chunkMetadata); + chunk.getData().mark(); } chunk.getData().reset(); return chunk; } public void addTimeChunkToCache(String file, long offset, Chunk chunk) { - if (cachedTimeChunkSize >= MAX_CACHED_TIME_CHUNKS_SIZE) { + if (cachedTimeChunkSize >= maxCachedTimeChunksSize) { return; } chunk.getData().mark(); @@ -98,6 +100,16 @@ public boolean isEmpty() { return compactChunkPlans.isEmpty(); } + @TestOnly + public static void setMaxCachedTimeChunksSize(long size) { + maxCachedTimeChunksSize = size; + } + + @TestOnly + public static long getMaxCachedTimeChunksSize() { + return maxCachedTimeChunksSize; + } + @Override public String toString() { return compactChunkPlans.toString(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java index 43dd0959303f..4ec74c57d63e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java @@ -68,7 +68,7 @@ public abstract class AbstractCompactionEstimator { ((double) SystemInfo.getInstance().getMemorySizeForCompaction() / IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()) - + BatchCompactionPlan.MAX_CACHED_TIME_CHUNKS_SIZE; + + BatchCompactionPlan.maxCachedTimeChunksSize; protected abstract long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java index c4f95addd282..1b4d693d6b97 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.BatchCompactionPlan; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils; import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -319,6 +320,42 @@ public void testCompactionByDeserialize() throws Exception { validate(targetResource); } + @Test + public void testCompactionByDeserializeWithLargeTimeChunk() throws Exception { + long defaultMaxCachedTimeChunkSize = BatchCompactionPlan.getMaxCachedTimeChunksSize(); + try { + BatchCompactionPlan.setMaxCachedTimeChunksSize(1); + TsFileResource unseqResource1 = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[][] { + new TimeRange[] {new TimeRange(100, 200), new TimeRange(500, 600)} + }, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(false, false, false), + false); + unseqResources.add(unseqResource1); + + TsFileResource unseqResource2 = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[] {new TimeRange(150, 550)}, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(false, false, false), + false); + unseqResources.add(unseqResource2); + + TsFileResource targetResource = performCompaction(); + validate(targetResource); + } finally { + BatchCompactionPlan.setMaxCachedTimeChunksSize(defaultMaxCachedTimeChunkSize); + } + } + @Test public void testCompactionByDeserializeWithEmptyColumn() throws Exception { TsFileResource unseqResource1 =