Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11706][SDK] Optimize HTTP Sender implementation #11707

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
Expand Down Expand Up @@ -69,9 +71,9 @@ public void close() {
senderCacheLock.writeLock().lock();
try {
// release groupId mapped senders
totalSenderCnt = innReleaseAllGroupIdSenders(groupIdSenderMap);
totalSenderCnt = releaseAllGroupIdSenders(groupIdSenderMap);
// release clusterId mapped senders
totalSenderCnt += innReleaseAllClusterIdSenders(clusterIdSenderMap);
totalSenderCnt += releaseAllClusterIdSenders(clusterIdSenderMap);
} finally {
senderCacheLock.writeLock().unlock();
}
Expand All @@ -90,9 +92,9 @@ public void removeClient(BaseSender msgSender) {
senderCacheLock.writeLock().lock();
try {
if (msgSender.getFactoryClusterIdKey() == null) {
removed = innRemoveGroupIdSender(msgSender, groupIdSenderMap);
removed = removeGroupIdSender(msgSender, groupIdSenderMap);
} else {
removed = innRemoveClusterIdSender(msgSender, clusterIdSenderMap);
removed = removeClusterIdSender(msgSender, clusterIdSenderMap);
}
} finally {
senderCacheLock.writeLock().unlock();
Expand All @@ -108,7 +110,7 @@ public int getMsgSenderCount() {

public InLongTcpMsgSender genTcpSenderByGroupId(
TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException {
ProxyUtils.validProxyConfigNotNull(configure);
validProxyConfigNotNull(configure);
// query cached sender
String metaConfigKey = configure.getGroupMetaConfigKey();
InLongTcpMsgSender messageSender =
Expand Down Expand Up @@ -148,9 +150,51 @@ public InLongTcpMsgSender genTcpSenderByGroupId(
}
}

public InLongHttpMsgSender genHttpSenderByGroupId(
HttpMsgSenderConfig configure) throws ProxySdkException {
validProxyConfigNotNull(configure);
// query cached sender
String metaConfigKey = configure.getGroupMetaConfigKey();
InLongHttpMsgSender messageSender =
(InLongHttpMsgSender) groupIdSenderMap.get(metaConfigKey);
if (messageSender != null) {
return messageSender;
}
// valid configure info
ProcessResult procResult = new ProcessResult();
qryProxyMetaConfigure(configure, procResult);
// generate sender
senderCacheLock.writeLock().lock();
try {
// re-get the created sender based on the groupId key after locked
messageSender = (InLongHttpMsgSender) groupIdSenderMap.get(metaConfigKey);
if (messageSender != null) {
return messageSender;
}
// build a new sender based on groupId
messageSender = new InLongHttpMsgSender(configure, msgSenderFactory, null);
if (!messageSender.start(procResult)) {
messageSender.close();
throw new ProxySdkException("Failed to start groupId sender: " + procResult);
}
groupIdSenderMap.put(metaConfigKey, messageSender);
logger.info("MsgSenderFactory({}) generated a new groupId({}) sender({})",
this.factoryNo, metaConfigKey, messageSender.getSenderId());
return messageSender;
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
logger.warn("MsgSenderFactory({}) build groupId sender({}) exception",
this.factoryNo, metaConfigKey, ex);
}
throw new ProxySdkException("Failed to build groupId sender: " + ex.getMessage());
} finally {
senderCacheLock.writeLock().unlock();
}
}

public InLongTcpMsgSender genTcpSenderByClusterId(
TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException {
ProxyUtils.validProxyConfigNotNull(configure);
validProxyConfigNotNull(configure);
// get groupId's clusterIdKey
ProcessResult procResult = new ProcessResult();
ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure, procResult);;
Expand Down Expand Up @@ -191,6 +235,48 @@ public InLongTcpMsgSender genTcpSenderByClusterId(
}
}

public InLongHttpMsgSender genHttpSenderByClusterId(
HttpMsgSenderConfig configure) throws ProxySdkException {
validProxyConfigNotNull(configure);
// get groupId's clusterIdKey
ProcessResult procResult = new ProcessResult();
ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure, procResult);;
String clusterIdKey = ProxyUtils.buildClusterIdKey(
configure.getDataRptProtocol(), configure.getRegionName(), proxyConfigEntry.getClusterId());
// get local built sender
InLongHttpMsgSender messageSender = (InLongHttpMsgSender) clusterIdSenderMap.get(clusterIdKey);
if (messageSender != null) {
return messageSender;
}
// generate sender
senderCacheLock.writeLock().lock();
try {
// re-get the created sender based on the clusterId Key after locked
messageSender = (InLongHttpMsgSender) clusterIdSenderMap.get(clusterIdKey);
if (messageSender != null) {
return messageSender;
}
// build a new sender based on clusterId Key
messageSender = new InLongHttpMsgSender(configure, msgSenderFactory, clusterIdKey);
if (!messageSender.start(procResult)) {
messageSender.close();
throw new ProxySdkException("Failed to start cluster sender: " + procResult);
}
clusterIdSenderMap.put(clusterIdKey, messageSender);
logger.info("MsgSenderFactory({}) generated a new clusterId({}) sender({})",
this.factoryNo, clusterIdKey, messageSender.getSenderId());
return messageSender;
} catch (Throwable ex) {
if (exptCounter.shouldPrint()) {
logger.warn("MsgSenderFactory({}) build cluster sender({}) exception",
this.factoryNo, clusterIdKey, ex);
}
throw new ProxySdkException("Failed to build cluster sender: " + ex.getMessage());
} finally {
senderCacheLock.writeLock().unlock();
}
}

private ProxyConfigEntry qryProxyMetaConfigure(
ProxyClientConfig proxyConfig, ProcessResult procResult) throws ProxySdkException {
ProxyConfigManager inlongMetaQryMgr = new ProxyConfigManager(proxyConfig);
Expand All @@ -205,7 +291,7 @@ private ProxyConfigEntry qryProxyMetaConfigure(
return inlongMetaQryMgr.getProxyConfigEntry();
}

private boolean innRemoveGroupIdSender(BaseSender msgSender, Map<String, BaseSender> senderMap) {
private boolean removeGroupIdSender(BaseSender msgSender, Map<String, BaseSender> senderMap) {
BaseSender tmpSender = senderMap.get(msgSender.getMetaConfigKey());
if (tmpSender == null
|| !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
Expand All @@ -214,7 +300,7 @@ private boolean innRemoveGroupIdSender(BaseSender msgSender, Map<String, BaseSen
return senderMap.remove(msgSender.getMetaConfigKey()) != null;
}

private boolean innRemoveClusterIdSender(BaseSender msgSender, Map<String, BaseSender> senderMap) {
private boolean removeClusterIdSender(BaseSender msgSender, Map<String, BaseSender> senderMap) {
BaseSender tmpSender = senderMap.get(msgSender.getFactoryClusterIdKey());
if (tmpSender == null
|| !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
Expand All @@ -223,7 +309,7 @@ private boolean innRemoveClusterIdSender(BaseSender msgSender, Map<String, BaseS
return senderMap.remove(msgSender.getFactoryClusterIdKey()) != null;
}

private int innReleaseAllGroupIdSenders(Map<String, BaseSender> senderMap) {
private int releaseAllGroupIdSenders(Map<String, BaseSender> senderMap) {
int totalSenderCnt = 0;
for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
if (entry == null || entry.getValue() == null) {
Expand All @@ -243,7 +329,7 @@ private int innReleaseAllGroupIdSenders(Map<String, BaseSender> senderMap) {
return totalSenderCnt;
}

private int innReleaseAllClusterIdSenders(Map<String, BaseSender> senderMap) {
private int releaseAllClusterIdSenders(Map<String, BaseSender> senderMap) {
int totalSenderCnt = 0;
for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
if (entry == null
Expand All @@ -264,4 +350,10 @@ private int innReleaseAllClusterIdSenders(Map<String, BaseSender> senderMap) {
senderMap.clear();
return totalSenderCnt;
}

private void validProxyConfigNotNull(ProxyClientConfig configure) throws ProxySdkException {
if (configure == null) {
throw new ProxySdkException("configure is null!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;

Expand Down Expand Up @@ -88,4 +90,22 @@ InLongTcpMsgSender genTcpSenderByClusterId(
*/
InLongTcpMsgSender genTcpSenderByClusterId(
TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException;

/**
* Get or generate a http sender from the factory according to groupId
*
* @param configure the sender configure
* @return the sender
*/
InLongHttpMsgSender genHttpSenderByGroupId(
HttpMsgSenderConfig configure) throws ProxySdkException;

/**
* Get or generate a http sender from the factory according to clusterId
*
* @param configure the sender configure
* @return the sender
*/
InLongHttpMsgSender genHttpSenderByClusterId(
HttpMsgSenderConfig configure) throws ProxySdkException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -79,7 +80,6 @@ public InLongTcpMsgSender genTcpSenderByGroupId(
if (!this.initialized.get()) {
throw new ProxySdkException("Please initialize the factory first!");
}
ProxyUtils.validProxyConfigNotNull(configure);
return this.baseMsgSenderFactory.genTcpSenderByGroupId(configure, selfDefineFactory);
}

Expand All @@ -95,7 +95,22 @@ public InLongTcpMsgSender genTcpSenderByClusterId(
if (!this.initialized.get()) {
throw new ProxySdkException("Please initialize the factory first!");
}
ProxyUtils.validProxyConfigNotNull(configure);
return this.baseMsgSenderFactory.genTcpSenderByClusterId(configure, selfDefineFactory);
}

@Override
public InLongHttpMsgSender genHttpSenderByGroupId(HttpMsgSenderConfig configure) throws ProxySdkException {
if (!this.initialized.get()) {
throw new ProxySdkException("Please initialize the factory first!");
}
return this.baseMsgSenderFactory.genHttpSenderByGroupId(configure);
}

@Override
public InLongHttpMsgSender genHttpSenderByClusterId(HttpMsgSenderConfig configure) throws ProxySdkException {
if (!this.initialized.get()) {
throw new ProxySdkException("Please initialize the factory first!");
}
return this.baseMsgSenderFactory.genHttpSenderByClusterId(configure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
Expand Down Expand Up @@ -86,7 +88,6 @@ public InLongTcpMsgSender genTcpSenderByGroupId(
if (!initialized.get()) {
throw new ProxySdkException("Please initialize the factory first!");
}
ProxyUtils.validProxyConfigNotNull(configure);
return baseMsgSenderFactory.genTcpSenderByGroupId(configure, selfDefineFactory);
}

Expand All @@ -102,7 +103,22 @@ public InLongTcpMsgSender genTcpSenderByClusterId(
if (!initialized.get()) {
throw new ProxySdkException("Please initialize the factory first!");
}
ProxyUtils.validProxyConfigNotNull(configure);
return baseMsgSenderFactory.genTcpSenderByClusterId(configure, selfDefineFactory);
}

@Override
public InLongHttpMsgSender genHttpSenderByGroupId(HttpMsgSenderConfig configure) throws ProxySdkException {
if (!initialized.get()) {
throw new ProxySdkException("Please initialize the factory first!");
}
return baseMsgSenderFactory.genHttpSenderByGroupId(configure);
}

@Override
public InLongHttpMsgSender genHttpSenderByClusterId(HttpMsgSenderConfig configure) throws ProxySdkException {
if (!initialized.get()) {
throw new ProxySdkException("Please initialize the factory first!");
}
return baseMsgSenderFactory.genHttpSenderByClusterId(configure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ public enum ErrorCode {
DP_UNCONFIGURED_GROUPID_OR_STREAMID(155, "DataProxy return unconfigured groupId or streamId"),
//
DP_RECEIVE_FAILURE(160, "DataProxy return message receive failure"),
//
HTTP_ASYNC_POOL_FULL(171, "Http async pool full"),
HTTP_ASYNC_OFFER_FAIL(172, "Http async offer event fail"),
HTTP_ASYNC_OFFER_EXCEPTION(173, "Http async offer event exception"),
HTTP_BUILD_CLIENT_EXCEPTION(174, "Http build client exception"),
//
BUILD_FORM_CONTENT_EXCEPTION(181, "Build form content exception"),
DP_RETURN_FAILURE(182, "DataProxy return failure"),
HTTP_VISIT_DP_EXCEPTION(183, "Http visit exception"),
DP_RETURN_UNKNOWN_ERROR(184, "DataProxy return unknown error"),

UNKNOWN_ERROR(9999, "Unknown error");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class SdkConsts {

public static String PREFIX_HTTP = "http://";
public static String PREFIX_HTTPS = "https://";
public static final String KEY_HTTP_FIELD_BODY = "body";
public static final String KEY_HTTP_FIELD_DELIMITER = "rcdDlmtr";

// dataproxy node config
public static final String MANAGER_DATAPROXY_API = "/inlong/manager/openapi/dataproxy/getIpList/";
Expand All @@ -32,6 +34,8 @@ public class SdkConsts {
public static final String BASIC_AUTH_HEADER = "authorization";
// default region name
public static final String VAL_DEF_REGION_NAME = "";
// http report method
public static final String DATAPROXY_REPORT_METHOD = "/dataproxy/message";
// config info sync interval in minutes
public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3;
public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1;
Expand Down
Loading
Loading