Skip to content

Commit

Permalink
[to rc/1.3.3] Fix compaction cached time chunk flip twice (#13904)
Browse files Browse the repository at this point in the history
* fix cached time chunk flip twice

* add ut

* rewind
  • Loading branch information
shuwenwei authored and HTHou committed Nov 11, 2024
1 parent 58e3a69 commit d0d2833
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils;

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;

import org.apache.tsfile.file.metadata.ChunkMetadata;
Expand All @@ -35,7 +36,7 @@
import java.util.Map;

public class BatchCompactionPlan {
public static final long MAX_CACHED_TIME_CHUNKS_SIZE = 2 * 1024 * 1024;
public static long maxCachedTimeChunksSize = 2 * 1024 * 1024;
private final List<CompactChunkPlan> compactChunkPlans = new ArrayList<>();
private final Map<String, Map<TimeRange, ModifiedStatus>> alignedPageModifiedStatusCache =
new HashMap<>();
Expand All @@ -51,12 +52,12 @@ public Chunk getTimeChunkFromCache(TsFileSequenceReader reader, ChunkMetadata ch
if (chunk == null) {
chunk = reader.readMemChunk(chunkMetadata);
}
chunk.getData().flip();
chunk.getData().rewind();
return chunk;
}

public void addTimeChunkToCache(String file, long offset, Chunk chunk) {
if (cachedTimeChunkSize >= MAX_CACHED_TIME_CHUNKS_SIZE) {
if (cachedTimeChunkSize >= maxCachedTimeChunksSize) {
return;
}
cachedTimeChunks.put(
Expand Down Expand Up @@ -96,6 +97,16 @@ public boolean isEmpty() {
return compactChunkPlans.isEmpty();
}

@TestOnly
public static void setMaxCachedTimeChunksSize(long size) {
maxCachedTimeChunksSize = size;
}

@TestOnly
public static long getMaxCachedTimeChunksSize() {
return maxCachedTimeChunksSize;
}

@Override
public String toString() {
return compactChunkPlans.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public abstract class AbstractCompactionEstimator {
((double) SystemInfo.getInstance().getMemorySizeForCompaction()
/ IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
* IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion())
+ BatchCompactionPlan.MAX_CACHED_TIME_CHUNKS_SIZE;
+ BatchCompactionPlan.maxCachedTimeChunksSize;

protected abstract long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.BatchCompactionPlan;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
Expand Down Expand Up @@ -318,6 +319,42 @@ public void testCompactionByDeserialize() throws Exception {
validate(targetResource);
}

@Test
public void testCompactionByDeserializeWithLargeTimeChunk() throws Exception {
long defaultMaxCachedTimeChunkSize = BatchCompactionPlan.getMaxCachedTimeChunksSize();
try {
BatchCompactionPlan.setMaxCachedTimeChunksSize(1);
TsFileResource unseqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[][] {
new TimeRange[] {new TimeRange(100, 200), new TimeRange(500, 600)}
},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
false);
unseqResources.add(unseqResource1);

TsFileResource unseqResource2 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(150, 550)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false, false),
false);
unseqResources.add(unseqResource2);

TsFileResource targetResource = performCompaction();
validate(targetResource);
} finally {
BatchCompactionPlan.setMaxCachedTimeChunksSize(defaultMaxCachedTimeChunkSize);
}
}

@Test
public void testCompactionByDeserializeWithEmptyColumn() throws Exception {
TsFileResource unseqResource1 =
Expand Down

0 comments on commit d0d2833

Please sign in to comment.