Skip to content

Commit

Permalink
Pipe: Reduce Calculations for PipeDataNodeRemainingEventAndTimeOperat…
Browse files Browse the repository at this point in the history
…or when the events are too many (#13854) (#13870)
  • Loading branch information
Caideyipi authored Oct 22, 2024
1 parent fe6c914 commit 1cb7988
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.PipeConnector;
Expand Down Expand Up @@ -166,11 +165,6 @@ public synchronized String register(
for (final PipeConnectorSubtaskLifeCycle lifeCycle :
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
lifeCycle.register();
if (isDataRegionConnector) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.register(
lifeCycle.getSubtask(), environment.getPipeName(), environment.getCreationTime());
}
}

return attributeSortedString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ public PipeProcessorSubtask(
// Only register dataRegions
if (StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))) {
PipeProcessorMetrics.getInstance().register(this);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
}
}

Expand Down Expand Up @@ -138,14 +137,11 @@ protected boolean executeOnce() throws Exception {
if (event instanceof TabletInsertionEvent) {
pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.markCollectInvocationCount(
pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof TsFileInsertionEvent) {
pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.markCollectInvocationCount(
.markTsFileCollectInvocationCount(
pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof PipeHeartbeatEvent) {
pipeProcessor.process(event, outputEventCollector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeHeartbeatEventMetrics;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.event.Event;
Expand All @@ -33,6 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

public class PipeHeartbeatEvent extends EnrichedEvent {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class);
Expand Down Expand Up @@ -79,15 +82,23 @@ public PipeHeartbeatEvent(

@Override
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseHeartbeatEventCount(pipeName + "_" + creationTime);
}
return true;
}

@Override
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
// PipeName == null indicates that the event is the raw event at disruptor,
// not the event copied and passed to the extractor
if (shouldPrintMessage && pipeName != null && LOGGER.isDebugEnabled()) {
LOGGER.debug(this.toString());
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseHeartbeatEventCount(pipeName + "_" + creationTime);
if (shouldPrintMessage && LOGGER.isDebugEnabled()) {
LOGGER.debug(this.toString());
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
Expand Down Expand Up @@ -133,6 +134,10 @@ public String getDeviceId() {
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
try {
PipeDataNodeResourceManager.wal().pin(walEntryHandler);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseInsertionEventCount(pipeName + "_" + creationTime);
}
return true;
} catch (final Exception e) {
LOGGER.warn(
Expand Down Expand Up @@ -161,6 +166,11 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
walEntryHandler.getMemTableId(), holderMessage),
e);
return false;
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseInsertionEventCount(pipeName + "_" + creationTime);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
Expand Down Expand Up @@ -115,11 +116,19 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseInsertionEventCount(pipeName + "_" + creationTime);
}
return true;
}

@Override
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseInsertionEventCount(pipeName + "_" + creationTime);
}
allocatedMemoryBlock.close();

// Record the deviceId before the memory is released,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
Expand Down Expand Up @@ -244,6 +245,10 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
if (isWithMod) {
modFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, null);
}
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseTsFileEventCount(pipeName + "_" + creationTime);
}
return true;
} catch (final Exception e) {
LOGGER.warn(
Expand All @@ -270,6 +275,11 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
tsFile.getPath(), holderMessage),
e);
return false;
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseTsFileEventCount(pipeName + "_" + creationTime);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
Expand Down Expand Up @@ -343,7 +342,6 @@ public void customize(

// register metric after generating taskID
PipeDataRegionExtractorMetrics.getInstance().register(this);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.pipe.agent.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
Expand Down Expand Up @@ -121,7 +118,7 @@ private void removeAutoGauge(final String pipeID) {

//////////////////////////// register & deregister (pipe integration) ////////////////////////////

public void register(final IoTDBDataRegionExtractor extractor) {
public void register(final IoTDBSchemaRegionExtractor extractor) {
// The metric is global thus the regionId is omitted
final String pipeID = extractor.getPipeName() + "_" + extractor.getCreationTime();
remainingEventAndTimeOperatorMap
Expand All @@ -132,38 +129,40 @@ public void register(final IoTDBDataRegionExtractor extractor) {
}
}

public void register(final PipeProcessorSubtask processorSubtask) {
// The metric is global thus the regionId is omitted
final String pipeID = processorSubtask.getPipeName() + "_" + processorSubtask.getCreationTime();
public void increaseInsertionEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.register(processorSubtask);
if (Objects.nonNull(metricService)) {
createMetrics(pipeID);
}
.increaseInsertionEventCount();
}

public void register(
final PipeConnectorSubtask connectorSubtask, final String pipeName, final long creationTime) {
// The metric is global thus the regionId is omitted
final String pipeID = pipeName + "_" + creationTime;
public void decreaseInsertionEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.register(connectorSubtask, pipeName, creationTime);
if (Objects.nonNull(metricService)) {
createMetrics(pipeID);
}
.decreaseInsertionEventCount();
}

public void register(final IoTDBSchemaRegionExtractor extractor) {
// The metric is global thus the regionId is omitted
final String pipeID = extractor.getPipeName() + "_" + extractor.getCreationTime();
public void increaseTsFileEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.register(extractor);
if (Objects.nonNull(metricService)) {
createMetrics(pipeID);
}
.increaseTsFileEventCount();
}

public void decreaseTsFileEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.decreaseTsFileEventCount();
}

public void increaseHeartbeatEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.increaseHeartbeatEventCount();
}

public void decreaseHeartbeatEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.decreaseHeartbeatEventCount();
}

public void thawRate(final String pipeID) {
Expand Down Expand Up @@ -218,7 +217,8 @@ public void markRegionCommit(final String pipeID, final boolean isDataRegion) {
}
}

public void markCollectInvocationCount(final String pipeID, final long collectInvocationCount) {
public void markTsFileCollectInvocationCount(
final String pipeID, final long collectInvocationCount) {
if (Objects.isNull(metricService)) {
return;
}
Expand All @@ -228,7 +228,7 @@ public void markCollectInvocationCount(final String pipeID, final long collectIn
return;
}

operator.markCollectInvocationCount(collectInvocationCount);
operator.markTsFileCollectInvocationCount(collectInvocationCount);
}

//////////////////////////// Show pipes ////////////////////////////
Expand Down
Loading

0 comments on commit 1cb7988

Please sign in to comment.