Skip to content

Commit

Permalink
[INLONG-11680][SDK] Optimize metric-related implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gosonzhang committed Jan 22, 2025
1 parent af502f8 commit dc91b33
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class MetaSyncInfo {

// meta error info
private final Map<Integer, LongAdder> syncErrInfo = new ConcurrentHashMap<>();
// msMs
private final TimeWastInfo syncWastMs = new TimeWastInfo("msMs");
// msMs: meta sync cost in Ms
private final TimeCostInfo syncCostMs = new TimeCostInfo("msMs");

public MetaSyncInfo() {
}
Expand All @@ -41,13 +41,13 @@ public void addSucMsgInfo(int errCode, long wastMs) {
}
}
longCount.increment();
syncWastMs.addTimeWastMs(wastMs);
syncCostMs.addTimeCostInMs(wastMs);
}

public void getAndResetValue(StringBuilder strBuff) {
if (syncErrInfo.isEmpty()) {
strBuff.append("\"mSync\":{\"errT\":{},");
syncWastMs.getAndResetValue(strBuff);
syncCostMs.getAndResetValue(strBuff);
strBuff.append("}");
} else {
long curCnt = 0;
Expand All @@ -59,14 +59,14 @@ public void getAndResetValue(StringBuilder strBuff) {
strBuff.append("\"e").append(entry.getKey()).append("\":").append(entry.getValue());
}
strBuff.append("},");
syncWastMs.getAndResetValue(strBuff);
syncCostMs.getAndResetValue(strBuff);
strBuff.append("}");
syncErrInfo.clear();
}
}

public void clear() {
syncWastMs.clear();
syncCostMs.clear();
syncErrInfo.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void run() {
outputMetricData(startTime, getAndIncIndex());
long dltTime = System.currentTimeMillis() - startTime;
if (dltTime > this.metricConfig.getMetricOutWarnIntMs()) {
logger.warn("Metric DataHolder({}) snapshot finished, wast={}ms!",
logger.warn("Metric DataHolder({}) snapshot finished, cost = {} ms!",
this.sender.getSenderId(), dltTime);
}
this.lstReportTime = startTime;
Expand All @@ -90,32 +90,32 @@ public void close() {
outputMetricData(startTime, getOldIndex());
outputMetricData(startTime, getCurIndex());
this.started = false;
logger.info("Metric DataHolder({}) closed, wast{}ms!",
logger.info("Metric DataHolder({}) closed, cost = {} ms!",
this.sender.getSenderId(), System.currentTimeMillis() - startTime);
}

public void addMetaSyncMetric(int errCode, long syncWastMs) {
public void addMetaSyncMetric(int errCode, long syncCostMs) {
if (!this.started || !this.metricConfig.isEnableMetric()) {
return;
}
MetricInfoUnit selectedUnit = metricUnits[itemIndex];
selectedUnit.refCnt.incrementAndGet();
try {
selectedUnit.metaSyncInfo.addSucMsgInfo(errCode, syncWastMs);
selectedUnit.metaSyncInfo.addSucMsgInfo(errCode, syncCostMs);
} finally {
selectedUnit.refCnt.decrementAndGet();
}
}

public void addSucMetric(String groupId, String streamId, int msgCnt, long wastMs) {
public void addSucMetric(String groupId, String streamId, int msgCnt, long costMs) {
if (!this.started || !this.metricConfig.isEnableMetric()) {
return;
}
MetricInfoUnit selectedUnit = metricUnits[itemIndex];
selectedUnit.refCnt.incrementAndGet();
try {
selectedUnit.addSucMsgInfo(groupId,
(this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, wastMs);
(this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, costMs);
} finally {
selectedUnit.refCnt.decrementAndGet();
}
Expand All @@ -135,29 +135,29 @@ public void addFailMetric(int errCode, String groupId, String streamId, int msgC
}
}

public void addCallbackSucMetric(String groupId, String streamId, int msgCnt, long wastMs, long callDurMs) {
public void addCallbackSucMetric(String groupId, String streamId, int msgCnt, long costMs, long callDurMs) {
if (!this.started || !this.metricConfig.isEnableMetric()) {
return;
}
MetricInfoUnit selectedUnit = metricUnits[itemIndex];
selectedUnit.refCnt.incrementAndGet();
try {
selectedUnit.addSucMsgInfo(groupId,
(this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, wastMs, callDurMs);
(this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, costMs, callDurMs);
} finally {
selectedUnit.refCnt.decrementAndGet();
}
}

public void addCallbackFailMetric(int errCode, String groupId, String streamId, int msgCnt, long wastMs) {
public void addCallbackFailMetric(int errCode, String groupId, String streamId, int msgCnt, long costMs) {
if (!this.started || !this.metricConfig.isEnableMetric()) {
return;
}
MetricInfoUnit selectedUnit = metricUnits[itemIndex];
selectedUnit.refCnt.incrementAndGet();
try {
selectedUnit.addFailMsgInfo(groupId,
(this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, errCode, wastMs);
(this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, errCode, costMs);
} finally {
selectedUnit.refCnt.decrementAndGet();
}
Expand Down Expand Up @@ -209,7 +209,7 @@ private static class MetricInfoUnit {
protected final ConcurrentHashMap<String, TrafficInfo> trafficMap = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<Integer, LongAdder> errCodeMap = new ConcurrentHashMap<>();

public void addSucMsgInfo(String groupId, String streamId, int msgCnt, long wastMs) {
public void addSucMsgInfo(String groupId, String streamId, int msgCnt, long costMs) {
String recordKey = getKeyStringByConfig(groupId, streamId);
TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
if (trafficInfo == null) {
Expand All @@ -219,10 +219,10 @@ public void addSucMsgInfo(String groupId, String streamId, int msgCnt, long wast
trafficInfo = tmpInfo;
}
}
trafficInfo.addSucMsgInfo(msgCnt, wastMs);
trafficInfo.addSucMsgInfo(msgCnt, costMs);
}

public void addSucMsgInfo(String groupId, String streamId, int msgCnt, long sdWastMs, long cbWastMs) {
public void addSucMsgInfo(String groupId, String streamId, int msgCnt, long sdCostMs, long cbCostMs) {
String recordKey = getKeyStringByConfig(groupId, streamId);
TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
if (trafficInfo == null) {
Expand All @@ -232,7 +232,7 @@ public void addSucMsgInfo(String groupId, String streamId, int msgCnt, long sdWa
trafficInfo = tmpInfo;
}
}
trafficInfo.addSucMsgInfo(msgCnt, sdWastMs, cbWastMs);
trafficInfo.addSucMsgInfo(msgCnt, sdCostMs, cbCostMs);
}

public void addFailMsgInfo(String groupId, String streamId, int msgCnt, int errCode) {
Expand All @@ -250,7 +250,7 @@ public void addFailMsgInfo(String groupId, String streamId, int msgCnt, int errC
}

public void addFailMsgInfo(String groupId, String streamId,
int msgCnt, int errCode, long cbWastMs) {
int msgCnt, int errCode, long cbCostMs) {
String recordKey = getKeyStringByConfig(groupId, streamId);
TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
if (trafficInfo == null) {
Expand All @@ -260,7 +260,7 @@ public void addFailMsgInfo(String groupId, String streamId,
trafficInfo = tmpInfo;
}
}
trafficInfo.addFailMsgInfo(msgCnt, cbWastMs);
trafficInfo.addFailMsgInfo(msgCnt, cbCostMs);
addSendErrCodeInfo(errCode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

public class TimeWastInfo {
public class TimeCostInfo {

private final String name;
// bucketT
Expand All @@ -33,39 +33,39 @@ public class TimeWastInfo {
private final AtomicLong minValue = new AtomicLong(Long.MAX_VALUE);
private final LongAdder totalCnt = new LongAdder();

public TimeWastInfo(String name) {
public TimeCostInfo(String name) {
this.name = name;
}

public void addTimeWastMs(long timeMs) {
this.sumTime.add(timeMs);
public void addTimeCostInMs(long timeInMs) {
this.sumTime.add(timeInMs);
this.totalCnt.increment();
this.updateMin(timeMs);
this.updateMax(timeMs);
if (timeMs < 2) {
addTimeWastBucketT("0t2");
} else if (timeMs < 4) {
addTimeWastBucketT("2t4");
} else if (timeMs < 8) {
addTimeWastBucketT("4t8");
} else if (timeMs < 16) {
addTimeWastBucketT("8t16");
} else if (timeMs < 32) {
addTimeWastBucketT("16t32");
} else if (timeMs < 96) {
addTimeWastBucketT("32t96");
} else if (timeMs < 128) {
addTimeWastBucketT("96t128");
} else if (timeMs < 256) {
addTimeWastBucketT("128t256");
} else if (timeMs < 512) {
addTimeWastBucketT("256t512");
} else if (timeMs < 1024) {
addTimeWastBucketT("512t1024");
} else if (timeMs < 20480) {
addTimeWastBucketT("1024t20480");
this.updateMin(timeInMs);
this.updateMax(timeInMs);
if (timeInMs < 2) {
addTimeCostBucketT("0t2");
} else if (timeInMs < 4) {
addTimeCostBucketT("2t4");
} else if (timeInMs < 8) {
addTimeCostBucketT("4t8");
} else if (timeInMs < 16) {
addTimeCostBucketT("8t16");
} else if (timeInMs < 32) {
addTimeCostBucketT("16t32");
} else if (timeInMs < 96) {
addTimeCostBucketT("32t96");
} else if (timeInMs < 128) {
addTimeCostBucketT("96t128");
} else if (timeInMs < 256) {
addTimeCostBucketT("128t256");
} else if (timeInMs < 512) {
addTimeCostBucketT("256t512");
} else if (timeInMs < 1024) {
addTimeCostBucketT("512t1024");
} else if (timeInMs < 20480) {
addTimeCostBucketT("1024t20480");
} else {
addTimeWastBucketT("20480t+∞");
addTimeCostBucketT("20480t+∞");
}
}

Expand Down Expand Up @@ -122,7 +122,7 @@ private void updateMax(long newValue) {
}
}

private void addTimeWastBucketT(String key) {
private void addTimeCostBucketT(String key) {
LongAdder longCount = this.sendTimeBucketT.get(key);
if (longCount == null) {
LongAdder tmpCount = new LongAdder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,38 +35,38 @@ public class TrafficInfo {
private final LongAdder failedPgkCount = new LongAdder();
// fMsg
private final LongAdder failedMsgCount = new LongAdder();
// sucMs
private final TimeWastInfo sendWastMs = new TimeWastInfo("sucMs");
// cbMs
private final TimeWastInfo callbackWastMs = new TimeWastInfo("cbMs");
// sucMs: send success time cost in Ms
private final TimeCostInfo sendCostMs = new TimeCostInfo("sucMs");
// cbMs: call back time cost in Ms
private final TimeCostInfo callbackCostMs = new TimeCostInfo("cbMs");

public TrafficInfo(String groupId, String streamId) {
this.groupId = groupId;
this.streamId = streamId;
}

public void addSucMsgInfo(int msgCnt, long wastMs) {
public void addSucMsgInfo(int msgCnt, long costMs) {
sendPkgCount.add(1);
sendMsgCount.add(msgCnt);
sendWastMs.addTimeWastMs(wastMs);
sendCostMs.addTimeCostInMs(costMs);
}

public void addSucMsgInfo(int msgCnt, long sdWastMs, long cbWastMs) {
public void addSucMsgInfo(int msgCnt, long sdCostMs, long cbCostMs) {
sendPkgCount.add(1);
sendMsgCount.add(msgCnt);
sendWastMs.addTimeWastMs(sdWastMs);
callbackWastMs.addTimeWastMs(cbWastMs);
sendCostMs.addTimeCostInMs(sdCostMs);
callbackCostMs.addTimeCostInMs(cbCostMs);
}

public void addFailMsgInfo(int msgCnt) {
failedPgkCount.add(1);
failedMsgCount.add(msgCnt);
}

public void addFailMsgInfo(int msgCnt, long cbWastMs) {
public void addFailMsgInfo(int msgCnt, long cbCostMs) {
failedPgkCount.add(1);
failedMsgCount.add(msgCnt);
callbackWastMs.addTimeWastMs(cbWastMs);
callbackCostMs.addTimeCostInMs(cbCostMs);
}

public void getAndResetValue(StringBuilder strBuff) {
Expand All @@ -79,9 +79,9 @@ public void getAndResetValue(StringBuilder strBuff) {
.append(",\"fMsg\":").append(failedPgkCount.sumThenReset())
.append(",\"fMsg\":").append(failedMsgCount.sumThenReset())
.append(",");
this.sendWastMs.getAndResetValue(strBuff);
this.sendCostMs.getAndResetValue(strBuff);
strBuff.append(",");
this.callbackWastMs.getAndResetValue(strBuff);
this.callbackCostMs.getAndResetValue(strBuff);
strBuff.append("}");
}

Expand All @@ -90,7 +90,7 @@ public void clear() {
this.sendMsgCount.reset();
this.failedPgkCount.reset();
this.failedMsgCount.reset();
this.sendWastMs.clear();
this.callbackWastMs.clear();
this.sendCostMs.clear();
this.callbackCostMs.clear();
}
}

0 comments on commit dc91b33

Please sign in to comment.