Skip to content

Commit

Permalink
add comment and optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
SpriCoder committed Jan 25, 2025
1 parent 0eabba8 commit 002c97d
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2216,23 +2216,23 @@ private void initMemoryAllocate(TrimProperties properties) {
}
// storage engine memory manager
MemoryManager storageEngineMemoryManager =
globalMemoryManager.createMemoryManager("StorageEngine", storageEngineMemorySize);
globalMemoryManager.getOrCreateMemoryManager("StorageEngine", storageEngineMemorySize);
conf.setStorageEngineMemoryManager(storageEngineMemoryManager);
// query engine memory manager
MemoryManager queryEngineMemoryManager =
globalMemoryManager.createMemoryManager("QueryEngine", queryEngineMemorySize);
globalMemoryManager.getOrCreateMemoryManager("QueryEngine", queryEngineMemorySize);
conf.setQueryEngineMemoryManager(queryEngineMemoryManager);
// schema engine memory manager
MemoryManager schemaEngineMemoryManager =
globalMemoryManager.createMemoryManager("SchemaEngine", schemaEngineMemorySize);
globalMemoryManager.getOrCreateMemoryManager("SchemaEngine", schemaEngineMemorySize);
conf.setSchemaEngineMemoryManager(schemaEngineMemoryManager);
// consensus layer memory manager
MemoryManager consensusMemoryManager =
globalMemoryManager.createMemoryManager("Consensus", consensusMemorySize);
globalMemoryManager.getOrCreateMemoryManager("Consensus", consensusMemorySize);
conf.setConsensusMemoryManager(consensusMemoryManager);
// pipe memory manager
MemoryManager pipeMemoryManager =
globalMemoryManager.createMemoryManager("Pipe", pipeMemorySize);
globalMemoryManager.getOrCreateMemoryManager("Pipe", pipeMemorySize);
conf.setPipeMemoryManager(pipeMemoryManager);

LOGGER.info(
Expand All @@ -2257,7 +2257,7 @@ private void initMemoryAllocate(TrimProperties properties) {

String offHeapMemoryStr = System.getProperty("OFF_HEAP_MEMORY");
MemoryManager offHeapMemoryManager =
globalMemoryManager.createMemoryManager(
globalMemoryManager.gerOrCreateMemoryManager(
"OffHeap", MemUtils.strToBytesCnt(offHeapMemoryStr), false);
conf.setOffHeapMemoryManager(offHeapMemoryManager);

Expand All @@ -2270,7 +2270,8 @@ private void initMemoryAllocate(TrimProperties properties) {
(offHeapMemoryManager.getTotalMemorySizeInBytes()
* conf.getMaxDirectBufferOffHeapMemorySizeProportion());
MemoryManager directBufferMemoryManager =
offHeapMemoryManager.createMemoryManager("DirectBuffer", totalDirectBufferMemorySizeLimit);
offHeapMemoryManager.getOrCreateMemoryManager(
"DirectBuffer", totalDirectBufferMemorySizeLimit);
conf.setDirectBufferMemoryManager(directBufferMemoryManager);
}

Expand Down Expand Up @@ -2330,32 +2331,34 @@ private void initStorageEngineAllocate(
}
}
MemoryManager writeMemoryManager =
storageEngineMemoryManager.createMemoryManager("Write", writeMemorySize);
storageEngineMemoryManager.getOrCreateMemoryManager("Write", writeMemorySize);
conf.setWriteMemoryManager(writeMemoryManager);
MemoryManager compactionMemoryManager =
storageEngineMemoryManager.createMemoryManager("Compaction", compactionMemorySize);
storageEngineMemoryManager.getOrCreateMemoryManager("Compaction", compactionMemorySize);
conf.setCompactionMemoryManager(compactionMemoryManager);
MemoryManager memtableMemoryManager =
writeMemoryManager.createMemoryManager("Memtable", memtableMemorySize);
writeMemoryManager.getOrCreateMemoryManager("Memtable", memtableMemorySize);
conf.setMemtableMemoryManager(memtableMemoryManager);
MemoryManager timePartitionMemoryManager =
writeMemoryManager.createMemoryManager("TimePartitionInfo", timePartitionInfoMemorySize);
writeMemoryManager.getOrCreateMemoryManager(
"TimePartitionInfo", timePartitionInfoMemorySize);
conf.setTimePartitionInfoMemoryManager(timePartitionMemoryManager);
long devicePathCacheMemorySize =
(long) (memtableMemorySize * conf.getDevicePathCacheProportion());
MemoryManager devicePathCacheMemoryManager =
memtableMemoryManager.createMemoryManager("DevicePathCache", devicePathCacheMemorySize);
memtableMemoryManager.getOrCreateMemoryManager(
"DevicePathCache", devicePathCacheMemorySize);
conf.setDevicePathCacheMemoryManager(devicePathCacheMemoryManager);
// TODO @spricoder check why this memory calculate by storage engine memory
long bufferedArraysMemorySize =
(long) (storageMemoryTotal * conf.getBufferedArraysMemoryProportion());
MemoryManager bufferedArraysMemoryManager =
memtableMemoryManager.createMemoryManager("BufferedArray", bufferedArraysMemorySize);
memtableMemoryManager.getOrCreateMemoryManager("BufferedArray", bufferedArraysMemorySize);
conf.setBufferedArraysMemoryManager(bufferedArraysMemoryManager);
long walBufferQueueMemorySize =
(long) (memtableMemorySize * conf.getWalBufferQueueProportion());
MemoryManager walBufferQueueMemoryManager =
memtableMemoryManager.createMemoryManager("WalBufferQueue", walBufferQueueMemorySize);
memtableMemoryManager.getOrCreateMemoryManager("WalBufferQueue", walBufferQueueMemorySize);
conf.setWalBufferQueueManager(walBufferQueueMemoryManager);
}

Expand Down Expand Up @@ -2388,23 +2391,23 @@ private void initSchemaMemoryAllocate(
}

MemoryManager schemaRegionMemoryManager =
schemaEngineMemoryManager.createMemoryManager(
schemaEngineMemoryManager.getOrCreateMemoryManager(
"SchemaRegion", schemaMemoryTotal * schemaMemoryProportion[0] / proportionSum);
conf.setSchemaRegionMemoryManager(schemaRegionMemoryManager);
LOGGER.info(
"allocateMemoryForSchemaRegion = {}",
conf.getSchemaRegionMemoryManager().getTotalMemorySizeInBytes());

MemoryManager schemaCacheMemoryManager =
schemaEngineMemoryManager.createMemoryManager(
schemaEngineMemoryManager.getOrCreateMemoryManager(
"SchemaCache", schemaMemoryTotal * schemaMemoryProportion[1] / proportionSum);
conf.setSchemaCacheMemoryManager(schemaCacheMemoryManager);
LOGGER.info(
"allocateMemoryForSchemaCache = {}",
conf.getSchemaCacheMemoryManager().getTotalMemorySizeInBytes());

MemoryManager partitionCacheMemoryManager =
schemaEngineMemoryManager.createMemoryManager(
schemaEngineMemoryManager.getOrCreateMemoryManager(
"PartitionCache", schemaMemoryTotal * schemaMemoryProportion[2] / proportionSum);
conf.setPartitionCacheMemoryManager(partitionCacheMemoryManager);
LOGGER.info(
Expand Down Expand Up @@ -2482,33 +2485,33 @@ private void initQueryEngineMemoryAllocate(
conf.setMaxBytesPerFragmentInstance(dataExchangeMemorySize / conf.getQueryThreadCount());

MemoryManager bloomFilterCacheMemoryManager =
queryEngineMemoryManager.createMemoryManager(
queryEngineMemoryManager.getOrCreateMemoryManager(
"BloomFilterCache", bloomFilterCacheMemorySize);
conf.setBloomFilterCacheMemoryManager(bloomFilterCacheMemoryManager);

MemoryManager chunkCacheMemoryManager =
queryEngineMemoryManager.createMemoryManager("ChunkCache", chunkCacheMemorySize);
queryEngineMemoryManager.getOrCreateMemoryManager("ChunkCache", chunkCacheMemorySize);
conf.setChunkCacheMemoryManager(chunkCacheMemoryManager);

MemoryManager timeSeriesMetaDataCacheMemoryManager =
queryEngineMemoryManager.createMemoryManager(
queryEngineMemoryManager.getOrCreateMemoryManager(
"TimeSeriesMetaDataCache", timeSeriesMetaDataCacheMemorySize);
conf.setTimeSeriesMetaDataCacheMemoryManager(timeSeriesMetaDataCacheMemoryManager);

MemoryManager coordinatorMemoryManager =
queryEngineMemoryManager.createMemoryManager("Coordinator", coordinatorMemorySize);
queryEngineMemoryManager.getOrCreateMemoryManager("Coordinator", coordinatorMemorySize);
conf.setCoordinatorMemoryManager(coordinatorMemoryManager);

MemoryManager operatorsMemoryManager =
queryEngineMemoryManager.createMemoryManager("Operators", operatorsMemorySize);
queryEngineMemoryManager.getOrCreateMemoryManager("Operators", operatorsMemorySize);
conf.setOperatorsMemoryManager(operatorsMemoryManager);

MemoryManager dataExchangeMemoryManager =
queryEngineMemoryManager.createMemoryManager("DataExchange", dataExchangeMemorySize);
queryEngineMemoryManager.getOrCreateMemoryManager("DataExchange", dataExchangeMemorySize);
conf.setDataExchangeMemoryManager(dataExchangeMemoryManager);

MemoryManager timeIndexMemoryManager =
queryEngineMemoryManager.createMemoryManager("TimeIndex", timeIndexMemorySize);
queryEngineMemoryManager.getOrCreateMemoryManager("TimeIndex", timeIndexMemorySize);
conf.setTimeIndexMemoryManager(timeIndexMemoryManager);
}

Expand Down Expand Up @@ -2967,13 +2970,13 @@ public void reclaimConsensusMemory() {
long newSize =
storageEngineMemoryManager.getTotalMemorySizeInBytes()
+ consensusMemoryManager.getTotalMemorySizeInBytes();
globalMemoryManager.removeChild("StorageEngine");
storageEngineMemoryManager.clear();
globalMemoryManager.removeChildMemoryManager("StorageEngine");
storageEngineMemoryManager.clearAll();
// @Spricoder to find a better way
consensusMemoryManager.resize(0);
consensusMemoryManager.setTotalMemorySizeInBytes(0);
// then we need to allocate the memory to storage engine
conf.setStorageEngineMemoryManager(
globalMemoryManager.createMemoryManager("StorageEngine", newSize));
globalMemoryManager.getOrCreateMemoryManager("StorageEngine", newSize));
SystemInfo.getInstance().allocateWriteMemory();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private PipeMemoryBlock forceAllocate(long sizeInBytes, PipeMemoryBlockType type

public synchronized void forceResize(PipeMemoryBlock block, long targetSize) {
if (block == null || block.isReleased()) {
LOGGER.warn("forceResize: cannot resize a null or released memory block");
LOGGER.warn("forceResize: cannot setTotalMemorySizeInBytes a null or released memory block");
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
Expand All @@ -34,7 +33,6 @@

public class GlobalMemoryMetrics implements IMetricSet {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final SystemInfo systemInfo = SystemInfo.getInstance();

private static final String TOTAL = "Total";
public static final String ON_HEAP = "OnHeap";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class StorageEngineMemoryMetrics implements IMetricSet {
private static final String STORAGE_ENGINE = "StorageEngine";
private static final String STORAGE_ENGINE_WRITE = "StorageEngine-Write";
private static final String STORAGE_ENGINE_WRITE_MEMTABLE = "StorageEngine-Write-Memtable";
private static final String STORAGE_ENGINE_WRITE_MEMTABLE_CACHE =
private static final String STORAGE_ENGINE_WRITE_MEMTABLE_DEVICE_PATH_CACHE =
"StorageEngine-Write-Memtable-DevicePathCache";
private static final String STORAGE_ENGINE_WRITE_MEMTABLE_BUFFERED_ARRAYS =
"StorageEngine-Write-Memtable-BufferedArrays";
Expand Down Expand Up @@ -113,7 +113,7 @@ public void bindTo(AbstractMetricService metricService) {
Metric.MEMORY_THRESHOLD_SIZE.toString(),
MetricLevel.NORMAL,
Tag.NAME.toString(),
STORAGE_ENGINE_WRITE_MEMTABLE_CACHE,
STORAGE_ENGINE_WRITE_MEMTABLE_DEVICE_PATH_CACHE,
Tag.TYPE.toString(),
GlobalMemoryMetrics.ON_HEAP,
Tag.LEVEL.toString(),
Expand Down Expand Up @@ -179,7 +179,7 @@ public void unbindFrom(AbstractMetricService metricService) {
Tag.LEVEL.toString(),
GlobalMemoryMetrics.LEVELS[3]));
Arrays.asList(
STORAGE_ENGINE_WRITE_MEMTABLE_CACHE,
STORAGE_ENGINE_WRITE_MEMTABLE_DEVICE_PATH_CACHE,
STORAGE_ENGINE_WRITE_MEMTABLE_BUFFERED_ARRAYS,
STORAGE_ENGINE_WRITE_MEMTABLE_WAL_BUFFER_QUEUE)
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public synchronized void reduceMemoryUsage(long memoryInBytes) {
@Override
public synchronized void forceResize(long newSizeInBytes) {
throw new UnsupportedOperationException(
"resize is not supported for LoadTsFileDataCacheMemoryBlock");
"setTotalMemorySizeInBytes is not supported for LoadTsFileDataCacheMemoryBlock");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ synchronized void forceResize(LoadTsFileAnalyzeSchemaMemoryBlock memoryBlock, lo

if (memoryBlock.getMemoryUsageInBytes() > newSizeInBytes) {
LOGGER.error(
"Load: Failed to resize memory block {} to {} bytes, current memory usage {} bytes",
"Load: Failed to setTotalMemorySizeInBytes memory block {} to {} bytes, current memory usage {} bytes",
memoryBlock,
newSizeInBytes,
memoryBlock.getMemoryUsageInBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ public static void clearEnvironment() {
private static void changeSchemaCacheMemorySize(long size) {
MemoryManager memoryManager = config.getSchemaEngineMemoryManager();
MemoryManager schemaCacheMemoryManager = config.getSchemaCacheMemoryManager();
schemaCacheMemoryManager.clear();
memoryManager.removeChild("schemaCache");
schemaCacheMemoryManager = memoryManager.createMemoryManager("schemaCache", size);
schemaCacheMemoryManager.clearAll();
memoryManager.removeChildMemoryManager("schemaCache");
schemaCacheMemoryManager = memoryManager.getOrCreateMemoryManager("schemaCache", size);
config.setSchemaCacheMemoryManager(schemaCacheMemoryManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public abstract class IMemoryBlock implements AutoCloseable {
/** The type of this memory block */
protected MemoryBlockType memoryBlockType;

/** The flag that indicates whether this memory block is released */
protected volatile boolean isReleased = false;

/** The maximum memory size in byte of this memory block */
protected long maxMemorySizeInByte = 0;

/** The memory usage in byte of this memory block */
protected final AtomicLong memoryUsageInBytes = new AtomicLong(0);

/** The flag that indicates whether this memory block is released */
protected volatile boolean isReleased = false;

/** Try to record memory managed by this memory block */
public abstract void recordMemory(final long size);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ public String toString() {
return "IoTDBMemoryBlock{"
+ "name="
+ name
+ ", isReleased="
+ isReleased
+ ", memoryBlockType="
+ memoryBlockType
+ ", maxMemorySizeInByte="
+ maxMemorySizeInByte
+ ", memoryUsageInBytes="
+ memoryUsageInBytes
+ ", isReleased="
+ isReleased
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

public enum MemoryBlockType {
NONE,
// function related memory
FUNCTION,
// performance related memory
PERFORMANCE,
}
Loading

0 comments on commit 002c97d

Please sign in to comment.