Skip to content

Commit

Permalink
Pipe: Added collect invocation count in the caculation of data region…
Browse files Browse the repository at this point in the history
… extractor events for data node remaining time (#12799)
  • Loading branch information
Caideyipi authored Jun 25, 2024
1 parent 0afc1be commit 86aa711
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 12 deletions.
4 changes: 1 addition & 3 deletions iotdb-core/datanode/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,7 @@
<!-- These are used at runtime in tests -->
<usedDependency>io.jsonwebtoken:jjwt-impl</usedDependency>
<usedDependency>io.jsonwebtoken:jjwt-jackson</usedDependency>
<!-- We need this dependency as it provides the metric managers used in tests -->
<usedDependency>org.apache.iotdb:metrics-core</usedDependency>
<!-- This dependency is required at runtime, when esnabling the rest service -->
<!-- This dependency is required at runtime, when enabling the rest service -->
<usedDependency>org.glassfish.jersey.inject:jersey-hk2</usedDependency>
</usedDependencies>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,19 @@ public void markRegionCommit(final String pipeID, final boolean isDataRegion) {
}
}

public void markCollectInvocationCount(final String pipeID, final long collectInvocationCount) {
if (Objects.isNull(metricService)) {
return;
}
final PipeDataNodeRemainingEventAndTimeOperator operator =
remainingEventAndTimeOperatorMap.get(pipeID);
if (Objects.isNull(operator)) {
return;
}

operator.markCollectInvocationCount(collectInvocationCount);
}

//////////////////////////// Show pipes ////////////////////////////

public Pair<Long, Double> getRemainingEventAndTime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.metrics.core.IoTDBMetricManager;
import org.apache.iotdb.metrics.core.type.IoTDBHistogram;
import org.apache.iotdb.pipe.api.event.Event;

import com.codahale.metrics.Clock;
Expand All @@ -49,6 +51,8 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicReference<Meter> dataRegionCommitMeter = new AtomicReference<>(null);
private final AtomicReference<Meter> schemaRegionCommitMeter = new AtomicReference<>(null);
private final IoTDBHistogram collectInvocationHistogram =
(IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram(null);

private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
Expand Down Expand Up @@ -85,12 +89,18 @@ long getRemainingEvents() {
final PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();

final double invocationValue = collectInvocationHistogram.getMean();
// Do not take heartbeat event into account
final int totalDataRegionWriteEventCount =
dataRegionExtractors.stream()
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
final double totalDataRegionWriteEventCount =
(dataRegionExtractors.stream()
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
- dataRegionExtractors.stream()
.map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
.reduce(Integer::sum)
.orElse(0))
* Math.max(invocationValue, 1)
+ dataRegionProcessors.stream()
.map(processorSubtask -> processorSubtask.getEventCount(true))
.reduce(Integer::sum)
Expand All @@ -99,10 +109,6 @@ long getRemainingEvents() {
.map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
.orElse(0)
- dataRegionExtractors.stream()
.map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
.reduce(Integer::sum)
.orElse(0)
- dataRegionConnectors.stream()
.map(PipeConnectorSubtask::getPipeHeartbeatEventCount)
.reduce(Integer::sum)
Expand Down Expand Up @@ -205,6 +211,11 @@ void markSchemaRegionCommit() {
});
}

void markCollectInvocationCount(final long collectInvocationCount) {
// If collectInvocationCount == 0, the event will still be committed once
collectInvocationHistogram.update(Math.max(collectInvocationCount, 1));
}

//////////////////////////// Switch ////////////////////////////

// Thread-safe & Idempotent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ public void resetCollectInvocationCount() {
collectInvocationCount.set(0);
}

public long getCollectInvocationCount() {
return collectInvocationCount.get();
}

public boolean hasNoCollectInvocationAfterReset() {
return collectInvocationCount.get() == 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,13 @@ protected boolean executeOnce() throws Exception {
if (event instanceof TabletInsertionEvent) {
pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.markCollectInvocationCount(taskID, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof TsFileInsertionEvent) {
pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.markCollectInvocationCount(taskID, outputEventCollector.getCollectInvocationCount());
} else if (event instanceof PipeHeartbeatEvent) {
pipeProcessor.process(event, outputEventCollector);
((PipeHeartbeatEvent) event).onProcessed();
Expand Down

0 comments on commit 86aa711

Please sign in to comment.