Skip to content

Commit

Permalink
[INLONG-11713][SDK] Optimize BaseMsgSenderFactory and TimeCostInfo im…
Browse files Browse the repository at this point in the history
…plementation (#11714)

* [INLONG-11713][SDK] Optimize BaseMsgSenderFactory and TimeCostInfo implementation

* [INLONG-11713][SDK] Optimize BaseMsgSenderFactory and TimeCostInfo implementation

---------

Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Feb 3, 2025
1 parent 2344e60 commit 8474807
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public void testDataProxyCluster() {
Assertions.assertNotNull(id);

// save cluster node
String ip = "127.0.0.1";
String ip = "127.0.0.2";
Integer port1 = 46800;
Integer nodeId1 =
this.saveDataProxyClusterNode(id, ClusterType.DATAPROXY, ip, port1, ProtocolType.TCP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public BaseMsgSenderFactory(MsgSenderFactory msgSenderFactory, String factoryNo)
public void close() {
int totalSenderCnt;
int totalTDBankCnt;
logger.info("MsgSenderFactory({}) is closing", this.factoryNo);
senderCacheLock.writeLock().lock();
try {
// release groupId mapped senders
Expand Down Expand Up @@ -197,7 +198,7 @@ public InLongTcpMsgSender genTcpSenderByClusterId(
validProxyConfigNotNull(configure);
// get groupId's clusterIdKey
ProcessResult procResult = new ProcessResult();
ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure, procResult);;
ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure, procResult);
String clusterIdKey = ProxyUtils.buildClusterIdKey(
configure.getDataRptProtocol(), configure.getRegionName(), proxyConfigEntry.getClusterId());
// get local built sender
Expand Down Expand Up @@ -288,7 +289,7 @@ private ProxyConfigEntry qryProxyMetaConfigure(
&& !inlongMetaQryMgr.getEncryptConfigure(true, procResult)) {
throw new ProxySdkException("Failed to query remote encrypt config: " + procResult);
}
return inlongMetaQryMgr.getProxyConfigEntry();
return (ProxyConfigEntry) procResult.getRetData();
}

private boolean removeGroupIdSender(BaseSender msgSender, Map<String, BaseSender> senderMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,22 @@ public void addTimeCostInMs(long timeInMs) {
}

public void getAndResetValue(StringBuilder strBuff) {
long curCnt = totalCnt.sumThenReset();
if (curCnt == 0) {
long curTotalCnt = totalCnt.sumThenReset();
if (curTotalCnt == 0) {
strBuff.append("\"").append(name)
.append("\":{\"bucketT\":{},\"min\":0,\"max\":0,\"avgT\":0,\"cnt\":0}");
} else {
curCnt = 0;
long bucketCnt = 0;
strBuff.append("\"").append(name).append("\":{\"bucketT\":{");
for (Map.Entry<String, LongAdder> entry : sendTimeBucketT.entrySet()) {
if (curCnt++ > 0) {
if (bucketCnt++ > 0) {
strBuff.append(",");
}
strBuff.append("\"").append(entry.getKey()).append("\":").append(entry.getValue());
}
strBuff.append("},\"min\":").append(this.minValue.getAndSet(Long.MAX_VALUE))
.append(",\"max\":").append(this.maxValue.getAndSet(Long.MIN_VALUE))
.append(",\"avgT\":").append(sumTime.sumThenReset() / curCnt).append("}");
.append(",\"avgT\":").append(sumTime.sumThenReset() / curTotalCnt).append("}");
sendTimeBucketT.clear();
}
}
Expand Down

0 comments on commit 8474807

Please sign in to comment.