From 1cb79885699e72b522c94700e4551593f4bf97bd Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 22 Oct 2024 19:15:51 +0800 Subject: [PATCH] Pipe: Reduce Calculations for PipeDataNodeRemainingEventAndTimeOperator when the events are too many (#13854) (#13870) --- .../PipeConnectorSubtaskManager.java | 6 -- .../processor/PipeProcessorSubtask.java | 6 +- .../common/heartbeat/PipeHeartbeatEvent.java | 15 ++- .../PipeInsertNodeTabletInsertionEvent.java | 10 ++ .../tablet/PipeRawTabletInsertionEvent.java | 9 ++ .../tsfile/PipeTsFileInsertionEvent.java | 10 ++ .../dataregion/IoTDBDataRegionExtractor.java | 2 - ...eDataNodeRemainingEventAndTimeMetrics.java | 56 +++++------ ...DataNodeRemainingEventAndTimeOperator.java | 95 +++++++------------ .../SubscriptionConnectorSubtaskManager.java | 3 - 10 files changed, 107 insertions(+), 105 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java index a156eae3b04d..1789d1611bb7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java @@ -29,7 +29,6 @@ import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecutor; -import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics; import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.pipe.api.PipeConnector; @@ -166,11 +165,6 @@ public synchronized String register( for (final PipeConnectorSubtaskLifeCycle lifeCycle : attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) { lifeCycle.register(); - if (isDataRegionConnector) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .register( - lifeCycle.getSubtask(), environment.getPipeName(), environment.getCreationTime()); - } } return attributeSortedString; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index 2b44a34ae99c..7f29b3c55f99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -90,7 +90,6 @@ public PipeProcessorSubtask( // Only register dataRegions if (StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))) { PipeProcessorMetrics.getInstance().register(this); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this); } } @@ -138,14 +137,11 @@ protected boolean executeOnce() throws Exception { if (event instanceof TabletInsertionEvent) { pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector); PipeProcessorMetrics.getInstance().markTabletEvent(taskID); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .markCollectInvocationCount( - pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount()); } else if (event instanceof TsFileInsertionEvent) { pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector); PipeProcessorMetrics.getInstance().markTsFileEvent(taskID); PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .markCollectInvocationCount( + .markTsFileCollectInvocationCount( pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount()); } else if (event instanceof PipeHeartbeatEvent) { pipeProcessor.process(event, outputEventCollector); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java index 5a72be19bc86..e64c3e12cd68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics; import org.apache.iotdb.db.pipe.metric.PipeHeartbeatEventMetrics; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.pipe.api.event.Event; @@ -33,6 +34,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; + public class PipeHeartbeatEvent extends EnrichedEvent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class); @@ -79,6 +82,10 @@ public PipeHeartbeatEvent( @Override public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { + if (Objects.nonNull(pipeName)) { + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .increaseHeartbeatEventCount(pipeName + "_" + creationTime); + } return true; } @@ -86,8 +93,12 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { // PipeName == null indicates that the event is the raw event at disruptor, // not the event copied and passed to the extractor - if (shouldPrintMessage && pipeName != null && LOGGER.isDebugEnabled()) { - LOGGER.debug(this.toString()); + if (Objects.nonNull(pipeName)) { + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .decreaseHeartbeatEventCount(pipeName + "_" + creationTime); + if (shouldPrintMessage && LOGGER.isDebugEnabled()) { + LOGGER.debug(this.toString()); + } } return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index d2b6e0c8a32f..bd01f26b6556 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; @@ -133,6 +134,10 @@ public String getDeviceId() { public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { try { PipeDataNodeResourceManager.wal().pin(walEntryHandler); + if (Objects.nonNull(pipeName)) { + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .increaseInsertionEventCount(pipeName + "_" + creationTime); + } return true; } catch (final Exception e) { LOGGER.warn( @@ -161,6 +166,11 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa walEntryHandler.getMemTableId(), holderMessage), e); return false; + } finally { + if (Objects.nonNull(pipeName)) { + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .decreaseInsertionEventCount(pipeName + "_" + creationTime); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 6b85bae9d9fc..a5795ede49c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; @@ -115,11 +116,19 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry( PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet)); + if (Objects.nonNull(pipeName)) { + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .increaseInsertionEventCount(pipeName + "_" + creationTime); + } return true; } @Override public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { + if (Objects.nonNull(pipeName)) { + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .decreaseInsertionEventCount(pipeName + "_" + creationTime); + } allocatedMemoryBlock.close(); // Record the deviceId before the memory is released, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index aa170be2d9fa..9e7a8d421e36 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper; +import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; @@ -244,6 +245,10 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa if (isWithMod) { modFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, null); } + if (Objects.nonNull(pipeName)) { + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .increaseTsFileEventCount(pipeName + "_" + creationTime); + } return true; } catch (final Exception e) { LOGGER.warn( @@ -270,6 +275,11 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa tsFile.getPath(), holderMessage), e); return false; + } finally { + if (Objects.nonNull(pipeName)) { + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .decreaseTsFileEventCount(pipeName + "_" + creationTime); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index f05bcf2bea5e..a2d91be2ab84 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -34,7 +34,6 @@ import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor; -import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics; import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; @@ -343,7 +342,6 @@ public void customize( // register metric after generating taskID PipeDataRegionExtractorMetrics.getInstance().register(this); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java index 597c62cb7da1..a8765b3b61dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java @@ -22,9 +22,6 @@ import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; -import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask; -import org.apache.iotdb.db.pipe.agent.task.subtask.processor.PipeProcessorSubtask; -import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor; import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.metricsets.IMetricSet; @@ -121,7 +118,7 @@ private void removeAutoGauge(final String pipeID) { //////////////////////////// register & deregister (pipe integration) //////////////////////////// - public void register(final IoTDBDataRegionExtractor extractor) { + public void register(final IoTDBSchemaRegionExtractor extractor) { // The metric is global thus the regionId is omitted final String pipeID = extractor.getPipeName() + "_" + extractor.getCreationTime(); remainingEventAndTimeOperatorMap @@ -132,38 +129,40 @@ public void register(final IoTDBDataRegionExtractor extractor) { } } - public void register(final PipeProcessorSubtask processorSubtask) { - // The metric is global thus the regionId is omitted - final String pipeID = processorSubtask.getPipeName() + "_" + processorSubtask.getCreationTime(); + public void increaseInsertionEventCount(final String pipeID) { remainingEventAndTimeOperatorMap .computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator()) - .register(processorSubtask); - if (Objects.nonNull(metricService)) { - createMetrics(pipeID); - } + .increaseInsertionEventCount(); } - public void register( - final PipeConnectorSubtask connectorSubtask, final String pipeName, final long creationTime) { - // The metric is global thus the regionId is omitted - final String pipeID = pipeName + "_" + creationTime; + public void decreaseInsertionEventCount(final String pipeID) { remainingEventAndTimeOperatorMap .computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator()) - .register(connectorSubtask, pipeName, creationTime); - if (Objects.nonNull(metricService)) { - createMetrics(pipeID); - } + .decreaseInsertionEventCount(); } - public void register(final IoTDBSchemaRegionExtractor extractor) { - // The metric is global thus the regionId is omitted - final String pipeID = extractor.getPipeName() + "_" + extractor.getCreationTime(); + public void increaseTsFileEventCount(final String pipeID) { remainingEventAndTimeOperatorMap .computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator()) - .register(extractor); - if (Objects.nonNull(metricService)) { - createMetrics(pipeID); - } + .increaseTsFileEventCount(); + } + + public void decreaseTsFileEventCount(final String pipeID) { + remainingEventAndTimeOperatorMap + .computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator()) + .decreaseTsFileEventCount(); + } + + public void increaseHeartbeatEventCount(final String pipeID) { + remainingEventAndTimeOperatorMap + .computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator()) + .increaseHeartbeatEventCount(); + } + + public void decreaseHeartbeatEventCount(final String pipeID) { + remainingEventAndTimeOperatorMap + .computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator()) + .decreaseHeartbeatEventCount(); } public void thawRate(final String pipeID) { @@ -218,7 +217,8 @@ public void markRegionCommit(final String pipeID, final boolean isDataRegion) { } } - public void markCollectInvocationCount(final String pipeID, final long collectInvocationCount) { + public void markTsFileCollectInvocationCount( + final String pipeID, final long collectInvocationCount) { if (Objects.isNull(metricService)) { return; } @@ -228,7 +228,7 @@ public void markCollectInvocationCount(final String pipeID, final long collectIn return; } - operator.markCollectInvocationCount(collectInvocationCount); + operator.markTsFileCollectInvocationCount(collectInvocationCount); } //////////////////////////// Show pipes //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java index bf0c4ba2386e..c00722151462 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java @@ -22,9 +22,6 @@ import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator; -import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask; -import org.apache.iotdb.db.pipe.agent.task.subtask.processor.PipeProcessorSubtask; -import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor; import org.apache.iotdb.metrics.core.IoTDBMetricManager; import org.apache.iotdb.metrics.core.type.IoTDBHistogram; @@ -38,17 +35,19 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { - private final Set dataRegionExtractors = - Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final Set dataRegionProcessors = - Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final Set dataRegionConnectors = - Collections.newSetFromMap(new ConcurrentHashMap<>()); + + // Calculate from schema region extractors directly for it requires less computation private final Set schemaRegionExtractors = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + private final AtomicInteger insertionEventCount = new AtomicInteger(0); + private final AtomicInteger tsfileEventCount = new AtomicInteger(0); + private final AtomicInteger heartbeatEventCount = new AtomicInteger(0); + private final AtomicReference dataRegionCommitMeter = new AtomicReference<>(null); private final AtomicReference schemaRegionCommitMeter = new AtomicReference<>(null); private final IoTDBHistogram collectInvocationHistogram = @@ -59,19 +58,33 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { //////////////////////////// Remaining event & time calculation //////////////////////////// + void increaseInsertionEventCount() { + tsfileEventCount.incrementAndGet(); + } + + void decreaseInsertionEventCount() { + tsfileEventCount.decrementAndGet(); + } + + void increaseTsFileEventCount() { + tsfileEventCount.incrementAndGet(); + } + + void decreaseTsFileEventCount() { + tsfileEventCount.decrementAndGet(); + } + + void increaseHeartbeatEventCount() { + heartbeatEventCount.incrementAndGet(); + } + + void decreaseHeartbeatEventCount() { + heartbeatEventCount.decrementAndGet(); + } + long getRemainingEvents() { - return dataRegionExtractors.stream() - .map(IoTDBDataRegionExtractor::getEventCount) - .reduce(Integer::sum) - .orElse(0) - + dataRegionProcessors.stream() - .map(processorSubtask -> processorSubtask.getEventCount(false)) - .reduce(Integer::sum) - .orElse(0) - + dataRegionConnectors.stream() - .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName)) - .reduce(Integer::sum) - .orElse(0) + return tsfileEventCount.get() + + heartbeatEventCount.get() + schemaRegionExtractors.stream() .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount) .reduce(Long::sum) @@ -92,27 +105,7 @@ long getRemainingEvents() { final double invocationValue = collectInvocationHistogram.getMean(); // Do not take heartbeat event into account final double totalDataRegionWriteEventCount = - (dataRegionExtractors.stream() - .map(IoTDBDataRegionExtractor::getEventCount) - .reduce(Integer::sum) - .orElse(0) - - dataRegionExtractors.stream() - .map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount) - .reduce(Integer::sum) - .orElse(0)) - * Math.max(invocationValue, 1) - + dataRegionProcessors.stream() - .map(processorSubtask -> processorSubtask.getEventCount(true)) - .reduce(Integer::sum) - .orElse(0) - + dataRegionConnectors.stream() - .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName)) - .reduce(Integer::sum) - .orElse(0) - - dataRegionConnectors.stream() - .map(PipeConnectorSubtask::getPipeHeartbeatEventCount) - .reduce(Integer::sum) - .orElse(0); + tsfileEventCount.get() * Math.max(invocationValue, 1) + insertionEventCount.get(); dataRegionCommitMeter.updateAndGet( meter -> { @@ -168,22 +161,6 @@ long getRemainingEvents() { //////////////////////////// Register & deregister (pipe integration) //////////////////////////// - void register(final IoTDBDataRegionExtractor extractor) { - setNameAndCreationTime(extractor.getPipeName(), extractor.getCreationTime()); - dataRegionExtractors.add(extractor); - } - - void register(final PipeProcessorSubtask processorSubtask) { - setNameAndCreationTime(processorSubtask.getPipeName(), processorSubtask.getCreationTime()); - dataRegionProcessors.add(processorSubtask); - } - - void register( - final PipeConnectorSubtask connectorSubtask, final String pipeName, final long creationTime) { - setNameAndCreationTime(pipeName, creationTime); - dataRegionConnectors.add(connectorSubtask); - } - void register(final IoTDBSchemaRegionExtractor extractor) { setNameAndCreationTime(extractor.getPipeName(), extractor.getCreationTime()); schemaRegionExtractors.add(extractor); @@ -211,7 +188,7 @@ void markSchemaRegionCommit() { }); } - void markCollectInvocationCount(final long collectInvocationCount) { + void markTsFileCollectInvocationCount(final long collectInvocationCount) { // If collectInvocationCount == 0, the event will still be committed once collectInvocationHistogram.update(Math.max(collectInvocationCount, 1)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java index e0707ebf5473..dfce62faac8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java @@ -31,7 +31,6 @@ import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask; import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtaskLifeCycle; import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeRealtimePriorityBlockingQueue; -import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics; import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; @@ -158,8 +157,6 @@ public synchronized String register( final PipeConnectorSubtaskLifeCycle lifeCycle = attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString); lifeCycle.register(); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .register(lifeCycle.getSubtask(), environment.getPipeName(), environment.getCreationTime()); return attributeSortedString; }