Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[To rc/1.3.3] Pipe: Reduce Calculations for PipeDataNodeRemainingEventAndTimeOperator when the events are too many (#13854) #13870

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.PipeConnector;
Expand Down Expand Up @@ -166,11 +165,6 @@ public synchronized String register(
for (final PipeConnectorSubtaskLifeCycle lifeCycle :
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
lifeCycle.register();
if (isDataRegionConnector) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.register(
lifeCycle.getSubtask(), environment.getPipeName(), environment.getCreationTime());
}
}

return attributeSortedString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ public PipeProcessorSubtask(
// Only register dataRegions
if (StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))) {
PipeProcessorMetrics.getInstance().register(this);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
}
}

Expand Down Expand Up @@ -138,14 +137,11 @@ protected boolean executeOnce() throws Exception {
if (event instanceof TabletInsertionEvent) {
pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.markCollectInvocationCount(
pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof TsFileInsertionEvent) {
pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.markCollectInvocationCount(
.markTsFileCollectInvocationCount(
pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof PipeHeartbeatEvent) {
pipeProcessor.process(event, outputEventCollector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeHeartbeatEventMetrics;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.event.Event;
Expand All @@ -33,6 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

public class PipeHeartbeatEvent extends EnrichedEvent {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class);
Expand Down Expand Up @@ -79,15 +82,23 @@ public PipeHeartbeatEvent(

@Override
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseHeartbeatEventCount(pipeName + "_" + creationTime);
}
return true;
}

@Override
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
// PipeName == null indicates that the event is the raw event at disruptor,
// not the event copied and passed to the extractor
if (shouldPrintMessage && pipeName != null && LOGGER.isDebugEnabled()) {
LOGGER.debug(this.toString());
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseHeartbeatEventCount(pipeName + "_" + creationTime);
if (shouldPrintMessage && LOGGER.isDebugEnabled()) {
LOGGER.debug(this.toString());
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
Expand Down Expand Up @@ -133,6 +134,10 @@ public String getDeviceId() {
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
try {
PipeDataNodeResourceManager.wal().pin(walEntryHandler);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseInsertionEventCount(pipeName + "_" + creationTime);
}
return true;
} catch (final Exception e) {
LOGGER.warn(
Expand Down Expand Up @@ -161,6 +166,11 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
walEntryHandler.getMemTableId(), holderMessage),
e);
return false;
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseInsertionEventCount(pipeName + "_" + creationTime);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
Expand Down Expand Up @@ -115,11 +116,19 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseInsertionEventCount(pipeName + "_" + creationTime);
}
return true;
}

@Override
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseInsertionEventCount(pipeName + "_" + creationTime);
}
allocatedMemoryBlock.close();

// Record the deviceId before the memory is released,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
Expand Down Expand Up @@ -244,6 +245,10 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
if (isWithMod) {
modFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, null);
}
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseTsFileEventCount(pipeName + "_" + creationTime);
}
return true;
} catch (final Exception e) {
LOGGER.warn(
Expand All @@ -270,6 +275,11 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
tsFile.getPath(), holderMessage),
e);
return false;
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseTsFileEventCount(pipeName + "_" + creationTime);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
Expand Down Expand Up @@ -343,7 +342,6 @@ public void customize(

// register metric after generating taskID
PipeDataRegionExtractorMetrics.getInstance().register(this);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.pipe.agent.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
Expand Down Expand Up @@ -121,7 +118,7 @@ private void removeAutoGauge(final String pipeID) {

//////////////////////////// register & deregister (pipe integration) ////////////////////////////

public void register(final IoTDBDataRegionExtractor extractor) {
public void register(final IoTDBSchemaRegionExtractor extractor) {
// The metric is global thus the regionId is omitted
final String pipeID = extractor.getPipeName() + "_" + extractor.getCreationTime();
remainingEventAndTimeOperatorMap
Expand All @@ -132,38 +129,40 @@ public void register(final IoTDBDataRegionExtractor extractor) {
}
}

public void register(final PipeProcessorSubtask processorSubtask) {
// The metric is global thus the regionId is omitted
final String pipeID = processorSubtask.getPipeName() + "_" + processorSubtask.getCreationTime();
public void increaseInsertionEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.register(processorSubtask);
if (Objects.nonNull(metricService)) {
createMetrics(pipeID);
}
.increaseInsertionEventCount();
}

public void register(
final PipeConnectorSubtask connectorSubtask, final String pipeName, final long creationTime) {
// The metric is global thus the regionId is omitted
final String pipeID = pipeName + "_" + creationTime;
public void decreaseInsertionEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.register(connectorSubtask, pipeName, creationTime);
if (Objects.nonNull(metricService)) {
createMetrics(pipeID);
}
.decreaseInsertionEventCount();
}

public void register(final IoTDBSchemaRegionExtractor extractor) {
// The metric is global thus the regionId is omitted
final String pipeID = extractor.getPipeName() + "_" + extractor.getCreationTime();
public void increaseTsFileEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.register(extractor);
if (Objects.nonNull(metricService)) {
createMetrics(pipeID);
}
.increaseTsFileEventCount();
}

public void decreaseTsFileEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.decreaseTsFileEventCount();
}

public void increaseHeartbeatEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.increaseHeartbeatEventCount();
}

public void decreaseHeartbeatEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.decreaseHeartbeatEventCount();
}

public void thawRate(final String pipeID) {
Expand Down Expand Up @@ -218,7 +217,8 @@ public void markRegionCommit(final String pipeID, final boolean isDataRegion) {
}
}

public void markCollectInvocationCount(final String pipeID, final long collectInvocationCount) {
public void markTsFileCollectInvocationCount(
final String pipeID, final long collectInvocationCount) {
if (Objects.isNull(metricService)) {
return;
}
Expand All @@ -228,7 +228,7 @@ public void markCollectInvocationCount(final String pipeID, final long collectIn
return;
}

operator.markCollectInvocationCount(collectInvocationCount);
operator.markTsFileCollectInvocationCount(collectInvocationCount);
}

//////////////////////////// Show pipes ////////////////////////////
Expand Down
Loading
Loading