diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index cabc80a5aa19..478e0e783b09 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -521,9 +521,7 @@
io.jsonwebtoken:jjwt-impl
io.jsonwebtoken:jjwt-jackson
-
- org.apache.iotdb:metrics-core
-
+
org.glassfish.jersey.inject:jersey-hk2
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index cff505d444e0..3daebdf0d84b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -210,6 +210,19 @@ public void markRegionCommit(final String pipeID, final boolean isDataRegion) {
}
}
+ public void markCollectInvocationCount(final String pipeID, final long collectInvocationCount) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
+ final PipeDataNodeRemainingEventAndTimeOperator operator =
+ remainingEventAndTimeOperatorMap.get(pipeID);
+ if (Objects.isNull(operator)) {
+ return;
+ }
+
+ operator.markCollectInvocationCount(collectInvocationCount);
+ }
+
//////////////////////////// Show pipes ////////////////////////////
public Pair getRemainingEventAndTime(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index 8963c7019c29..e85a212b96d6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -26,6 +26,8 @@
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
+import org.apache.iotdb.metrics.core.IoTDBMetricManager;
+import org.apache.iotdb.metrics.core.type.IoTDBHistogram;
import org.apache.iotdb.pipe.api.event.Event;
import com.codahale.metrics.Clock;
@@ -49,6 +51,8 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicReference dataRegionCommitMeter = new AtomicReference<>(null);
private final AtomicReference schemaRegionCommitMeter = new AtomicReference<>(null);
+ private final IoTDBHistogram collectInvocationHistogram =
+ (IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram(null);
private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
@@ -85,12 +89,18 @@ long getRemainingEvents() {
final PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
+ final double invocationValue = collectInvocationHistogram.getMean();
// Do not take heartbeat event into account
- final int totalDataRegionWriteEventCount =
- dataRegionExtractors.stream()
- .map(IoTDBDataRegionExtractor::getEventCount)
- .reduce(Integer::sum)
- .orElse(0)
+ final double totalDataRegionWriteEventCount =
+ (dataRegionExtractors.stream()
+ .map(IoTDBDataRegionExtractor::getEventCount)
+ .reduce(Integer::sum)
+ .orElse(0)
+ - dataRegionExtractors.stream()
+ .map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
+ .reduce(Integer::sum)
+ .orElse(0))
+ * Math.max(invocationValue, 1)
+ dataRegionProcessors.stream()
.map(processorSubtask -> processorSubtask.getEventCount(true))
.reduce(Integer::sum)
@@ -99,10 +109,6 @@ long getRemainingEvents() {
.map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
.orElse(0)
- - dataRegionExtractors.stream()
- .map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
- .reduce(Integer::sum)
- .orElse(0)
- dataRegionConnectors.stream()
.map(PipeConnectorSubtask::getPipeHeartbeatEventCount)
.reduce(Integer::sum)
@@ -205,6 +211,11 @@ void markSchemaRegionCommit() {
});
}
+ void markCollectInvocationCount(final long collectInvocationCount) {
+ // If collectInvocationCount == 0, the event will still be committed once
+ collectInvocationHistogram.update(Math.max(collectInvocationCount, 1));
+ }
+
//////////////////////////// Switch ////////////////////////////
// Thread-safe & Idempotent
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index ddc7ee5a2d33..c97257cae61a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -174,6 +174,10 @@ public void resetCollectInvocationCount() {
collectInvocationCount.set(0);
}
+ public long getCollectInvocationCount() {
+ return collectInvocationCount.get();
+ }
+
public boolean hasNoCollectInvocationAfterReset() {
return collectInvocationCount.get() == 0;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 8ead33deb79d..96e0911af3a3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -136,9 +136,13 @@ protected boolean executeOnce() throws Exception {
if (event instanceof TabletInsertionEvent) {
pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ .markCollectInvocationCount(taskID, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof TsFileInsertionEvent) {
pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ .markCollectInvocationCount(taskID, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof PipeHeartbeatEvent) {
pipeProcessor.process(event, outputEventCollector);
((PipeHeartbeatEvent) event).onProcessed();