From 83fabafedcf4279586dc844bea294dcfa50cb9cf Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Wed, 3 Jan 2024 20:38:27 +0800 Subject: [PATCH 1/3] [INLONG-9548][Agent] Supports HTTPS and can determine whether to enable it through configuration --- .../agent/constant/FetcherConstants.java | 4 ++ .../inlong/agent/utils/HttpManager.java | 56 +++++++++++++++++-- .../sinks/filecollect/SenderManager.java | 19 ++++--- 3 files changed, 66 insertions(+), 13 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java index 642fac298f..92bb0a8893 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java @@ -31,6 +31,10 @@ public class FetcherConstants { // default is 30s public static final int DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT = 30; + // enable https + public static final String ENABLE_HTTPS = "enable.https"; + public static final boolean DEFAULT_ENABLE_HTTPS = false; + // required config public static final String AGENT_MANAGER_VIP_HTTP_HOST = "agent.manager.vip.http.host"; public static final String AGENT_MANAGER_VIP_HTTP_PORT = "agent.manager.vip.http.port"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java index 70897dd1d4..06f557f14b 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java @@ -26,14 +26,21 @@ import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.ssl.SSLContexts; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; + import java.nio.charset.Charset; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeUnit; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HTTP_APPLICATION_JSON; @@ -46,6 +53,8 @@ import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_ENABLE_HTTPS; +import static org.apache.inlong.agent.constant.FetcherConstants.ENABLE_HTTPS; /** * Perform http operation @@ -64,10 +73,17 @@ public class HttpManager { private final CloseableHttpClient httpClient; private final String secretId; private final String secretKey; + private static boolean enableHttps; public HttpManager(AgentConfiguration conf) { - httpClient = constructHttpClient(conf.getInt(AGENT_MANAGER_REQUEST_TIMEOUT, - DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT)); + enableHttps = agentConf.getBoolean(ENABLE_HTTPS, DEFAULT_ENABLE_HTTPS); + int timeout = conf.getInt(AGENT_MANAGER_REQUEST_TIMEOUT, + DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT); + if (enableHttps) { + httpClient = constructHttpsClient(timeout); + } else { + httpClient = constructHttpClient(timeout); + } secretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID); secretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY); } @@ -75,10 +91,17 @@ public HttpManager(AgentConfiguration conf) { /** * build base url for manager according to config * - * example - http://127.0.0.1:8080/inlong/manager/openapi + * example(http) - http://127.0.0.1:8080/inlong/manager/openapi + * example(https) - https://127.0.0.1:8080/inlong/manager/openapi */ public static String buildBaseUrl() { - return "http://" + agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST) + String urlHead; + if (enableHttps) { + urlHead = "https://"; + } else { + urlHead = "http://"; + } + return urlHead + agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST) + ":" + agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT) + agentConf.get(AGENT_MANAGER_VIP_HTTP_PREFIX_PATH, DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH); } @@ -102,6 +125,31 @@ private synchronized CloseableHttpClient constructHttpClient(int timeout) { return httpClientBuilder.build(); } + /** + * construct https client + * + * @param timeout timeout setting + * @return closeable timeout + */ + private static CloseableHttpClient constructHttpsClient(int timeout) { + long timeoutInMs = TimeUnit.SECONDS.toMillis(timeout); + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout((int) timeoutInMs) + .setSocketTimeout((int) timeoutInMs).build(); + SSLContext sslContext = null; + try { + sslContext = SSLContexts.custom().build(); + } catch (NoSuchAlgorithmException e) { + LOGGER.error("constructHttpsClient error ", e); + } catch (KeyManagementException e) { + LOGGER.error("constructHttpsClient error ", e); + } + SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, + new String[]{"TLSv1.2"}, null, + SSLConnectionSocketFactory.getDefaultHostnameVerifier()); + + return HttpClients.custom().setDefaultRequestConfig(requestConfig).setSSLSocketFactory(sslsf).build(); + } + /** * doPost * diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index 933ed23583..220ca790a4 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -56,6 +56,8 @@ import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_ENABLE_HTTPS; +import static org.apache.inlong.agent.constant.FetcherConstants.ENABLE_HTTPS; import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_PROXY_SEND; import static org.apache.inlong.agent.constant.TaskConstants.JOB_PROXY_SEND; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; @@ -85,7 +87,7 @@ public class SenderManager { private final int managerPort; private final String netTag; private final String localhost; - private final boolean isLocalVisit; + private final boolean unsecuredConnection; private final int totalAsyncBufSize; private final int aliveConnectionNum; private final boolean isCompress; @@ -110,17 +112,16 @@ public class SenderManager { protected InstanceProfile profile; private volatile boolean resendRunning = false; private volatile boolean started = false; + private static final AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); public SenderManager(InstanceProfile profile, String inlongGroupId, String sourcePath) { - AgentConfiguration conf = AgentConfiguration.getAgentConf(); this.profile = profile; - managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST); - managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT); + managerHost = agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST); + managerPort = agentConf.getInt(AGENT_MANAGER_VIP_HTTP_PORT); proxySend = profile.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND); localhost = profile.get(CommonConstants.PROXY_LOCAL_HOST, CommonConstants.DEFAULT_PROXY_LOCALHOST); netTag = profile.get(CommonConstants.PROXY_NET_TAG, CommonConstants.DEFAULT_PROXY_NET_TAG); - isLocalVisit = profile.getBoolean( - CommonConstants.PROXY_IS_LOCAL_VISIT, CommonConstants.DEFAULT_PROXY_IS_LOCAL_VISIT); + unsecuredConnection = !agentConf.getBoolean(ENABLE_HTTPS, DEFAULT_ENABLE_HTTPS); totalAsyncBufSize = profile .getInt( CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE, @@ -145,8 +146,8 @@ public SenderManager(InstanceProfile profile, String inlongGroupId, String sourc enableBusyWait = profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT, CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT); batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL); - authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID); - authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY); + authSecretId = agentConf.get(AGENT_MANAGER_AUTH_SECRET_ID); + authSecretKey = agentConf.get(AGENT_MANAGER_AUTH_SECRET_KEY); this.sourcePath = sourcePath; this.inlongGroupId = inlongGroupId; @@ -207,7 +208,7 @@ private AgentMetricItem getMetricItem(String groupId, String streamId) { private void createMessageSender(String tagName) throws Exception { ProxyClientConfig proxyClientConfig = new ProxyClientConfig( - localhost, isLocalVisit, managerHost, managerPort, tagName, authSecretId, authSecretKey); + localhost, unsecuredConnection, managerHost, managerPort, tagName, authSecretId, authSecretKey); proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize); proxyClientConfig.setFile(isFile); proxyClientConfig.setAliveConnections(aliveConnectionNum); From 023291253695fcf46b3cae19ec93a94be3d22839 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 4 Jan 2024 20:49:29 +0800 Subject: [PATCH 2/3] [INLONG-9548][Agent] Supports HTTPS and can determine whether to enable it through configuration --- .../agent/constant/FetcherConstants.java | 3 +-- .../inlong/agent/pojo/TaskProfileDto.java | 9 +++----- .../inlong/agent/utils/HttpManager.java | 17 ++++---------- .../agent/core/AgentBaseTestsHelper.java | 2 ++ .../agent/plugin/fetcher/ManagerFetcher.java | 5 ++--- .../agent/plugin/sinks/SenderManager.java | 21 +++++------------- .../sinks/filecollect/SenderManager.java | 22 +++++-------------- .../agent/plugin/AgentBaseTestsHelper.java | 10 +++++---- .../sinks/filecollect/TestSenderManager.java | 2 +- .../plugin/sources/TestLogFileSource.java | 2 +- .../plugin/task/TestLogfileCollectTask.java | 2 +- .../sdk/dataproxy/ProxyClientConfig.java | 8 +++++-- 12 files changed, 37 insertions(+), 66 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java index 92bb0a8893..c336bce2cd 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java @@ -36,8 +36,7 @@ public class FetcherConstants { public static final boolean DEFAULT_ENABLE_HTTPS = false; // required config - public static final String AGENT_MANAGER_VIP_HTTP_HOST = "agent.manager.vip.http.host"; - public static final String AGENT_MANAGER_VIP_HTTP_PORT = "agent.manager.vip.http.port"; + public static final String AGENT_MANAGER_ADDR = "agent.manager.addr"; public static final String AGENT_MANAGER_VIP_HTTP_PATH = "agent.manager.vip.http.managerIp.path"; public static final String DEFAULT_AGENT_TDM_VIP_HTTP_PATH = "/agent/getManagerIpList"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index 6ce6ba2d3d..f4b78e3686 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -29,8 +29,7 @@ import lombok.Data; import static java.util.Objects.requireNonNull; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR; import static org.apache.inlong.agent.constant.TaskConstants.SYNC_SEND_OPEN; import static org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_DATAPROXY; @@ -376,8 +375,7 @@ private static Proxy getProxy(DataConfig dataConfigs) { Proxy proxy = new Proxy(); Manager manager = new Manager(); AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); - manager.setHost(agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST)); - manager.setPort(agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT)); + manager.setAddr(agentConf.get(AGENT_MANAGER_ADDR)); proxy.setInlongGroupId(dataConfigs.getInlongGroupId()); proxy.setInlongStreamId(dataConfigs.getInlongStreamId()); proxy.setManager(manager); @@ -538,8 +536,7 @@ public static class Task { @Data public static class Manager { - private String port; - private String host; + private String addr; } @Data diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java index 06f557f14b..aa4e312696 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java @@ -22,6 +22,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import org.apache.commons.lang3.StringUtils; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -45,16 +46,13 @@ import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HTTP_APPLICATION_JSON; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HTTP_SUCCESS_CODE; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REQUEST_TIMEOUT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_ENABLE_HTTPS; -import static org.apache.inlong.agent.constant.FetcherConstants.ENABLE_HTTPS; /** * Perform http operation @@ -76,7 +74,7 @@ public class HttpManager { private static boolean enableHttps; public HttpManager(AgentConfiguration conf) { - enableHttps = agentConf.getBoolean(ENABLE_HTTPS, DEFAULT_ENABLE_HTTPS); + enableHttps = StringUtils.startsWith(agentConf.get(AGENT_MANAGER_ADDR), "https"); int timeout = conf.getInt(AGENT_MANAGER_REQUEST_TIMEOUT, DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT); if (enableHttps) { @@ -95,14 +93,7 @@ public HttpManager(AgentConfiguration conf) { * example(https) - https://127.0.0.1:8080/inlong/manager/openapi */ public static String buildBaseUrl() { - String urlHead; - if (enableHttps) { - urlHead = "https://"; - } else { - urlHead = "http://"; - } - return urlHead + agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST) - + ":" + agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT) + return agentConf.get(AGENT_MANAGER_ADDR) + agentConf.get(AGENT_MANAGER_VIP_HTTP_PREFIX_PATH, DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH); } diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java index 8207dd9a99..fa6d0ebd66 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java @@ -20,6 +20,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.FetcherConstants; import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; import org.apache.inlong.common.enums.TaskStateEnum; import org.apache.inlong.common.pojo.agent.DataConfig; @@ -55,6 +56,7 @@ public AgentBaseTestsHelper setupAgentHome() { boolean result = testRootDir.toFile().mkdirs(); LOGGER.info("try to create {}, result is {}", testRootDir, result); AgentConfiguration.getAgentConf().set(AgentConstants.AGENT_HOME, testRootDir.toString()); + AgentConfiguration.getAgentConf().set(FetcherConstants.AGENT_MANAGER_ADDR, ""); return this; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java index 51e9d22508..fabc02a139 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java @@ -48,10 +48,9 @@ import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID; import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_UNIQ_ID; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_RETURN_PARAM_DATA; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_TASK_HTTP_PATH; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH; import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData; @@ -94,7 +93,7 @@ public ManagerFetcher(AgentManager agentManager) { } private boolean requiredKeys(AgentConfiguration conf) { - return conf.hasKey(AGENT_MANAGER_VIP_HTTP_HOST) && conf.hasKey(AGENT_MANAGER_VIP_HTTP_PORT); + return conf.hasKey(AGENT_MANAGER_ADDR); } /** diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java index fcbac27121..ac0b19fcff 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java @@ -51,10 +51,9 @@ import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT; import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_PROXY_SEND; import static org.apache.inlong.agent.constant.JobConstants.JOB_PROXY_SEND; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; @@ -79,11 +78,7 @@ public class SenderManager { // in case of thread abusing. private ThreadFactory SHARED_FACTORY; private static final AtomicLong METRIC_INDEX = new AtomicLong(0); - private final String managerHost; - private final int managerPort; - private final String netTag; - private final String localhost; - private final boolean isLocalVisit; + private final String managerAddr; private final int totalAsyncBufSize; private final int aliveConnectionNum; private final boolean isCompress; @@ -109,13 +104,8 @@ public class SenderManager { public SenderManager(JobProfile jobConf, String inlongGroupId, String sourcePath) { AgentConfiguration conf = AgentConfiguration.getAgentConf(); - managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST); - managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT); + managerAddr = conf.get(AGENT_MANAGER_ADDR); proxySend = jobConf.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND); - localhost = jobConf.get(CommonConstants.PROXY_LOCAL_HOST, CommonConstants.DEFAULT_PROXY_LOCALHOST); - netTag = jobConf.get(CommonConstants.PROXY_NET_TAG, CommonConstants.DEFAULT_PROXY_NET_TAG); - isLocalVisit = jobConf.getBoolean( - CommonConstants.PROXY_IS_LOCAL_VISIT, CommonConstants.DEFAULT_PROXY_IS_LOCAL_VISIT); totalAsyncBufSize = jobConf .getInt( CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE, @@ -199,9 +189,8 @@ private AgentMetricItem getMetricItem(String groupId, String streamId) { * @return DefaultMessageSender */ private DefaultMessageSender createMessageSender(String tagName) throws Exception { - - ProxyClientConfig proxyClientConfig = new ProxyClientConfig( - localhost, isLocalVisit, managerHost, managerPort, tagName, authSecretId, authSecretKey); + ProxyClientConfig proxyClientConfig = new ProxyClientConfig(managerAddr, inlongGroupId, authSecretId, + authSecretKey); proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize); proxyClientConfig.setFile(isFile); proxyClientConfig.setAliveConnections(aliveConnectionNum); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index 220ca790a4..17c1d3ff8a 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -52,12 +52,9 @@ import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_ENABLE_HTTPS; -import static org.apache.inlong.agent.constant.FetcherConstants.ENABLE_HTTPS; import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_PROXY_SEND; import static org.apache.inlong.agent.constant.TaskConstants.JOB_PROXY_SEND; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; @@ -83,11 +80,7 @@ public class SenderManager { // in case of thread abusing. private ThreadFactory SHARED_FACTORY; private static final AtomicLong METRIC_INDEX = new AtomicLong(0); - private final String managerHost; - private final int managerPort; - private final String netTag; - private final String localhost; - private final boolean unsecuredConnection; + private final String managerAddr; private final int totalAsyncBufSize; private final int aliveConnectionNum; private final boolean isCompress; @@ -116,12 +109,8 @@ public class SenderManager { public SenderManager(InstanceProfile profile, String inlongGroupId, String sourcePath) { this.profile = profile; - managerHost = agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST); - managerPort = agentConf.getInt(AGENT_MANAGER_VIP_HTTP_PORT); + managerAddr = agentConf.get(AGENT_MANAGER_ADDR); proxySend = profile.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND); - localhost = profile.get(CommonConstants.PROXY_LOCAL_HOST, CommonConstants.DEFAULT_PROXY_LOCALHOST); - netTag = profile.get(CommonConstants.PROXY_NET_TAG, CommonConstants.DEFAULT_PROXY_NET_TAG); - unsecuredConnection = !agentConf.getBoolean(ENABLE_HTTPS, DEFAULT_ENABLE_HTTPS); totalAsyncBufSize = profile .getInt( CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE, @@ -206,9 +195,8 @@ private AgentMetricItem getMetricItem(String groupId, String streamId) { * @param tagName we use group id as tag name */ private void createMessageSender(String tagName) throws Exception { - - ProxyClientConfig proxyClientConfig = new ProxyClientConfig( - localhost, unsecuredConnection, managerHost, managerPort, tagName, authSecretId, authSecretKey); + ProxyClientConfig proxyClientConfig = new ProxyClientConfig(managerAddr, inlongGroupId, authSecretId, + authSecretKey); proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize); proxyClientConfig.setFile(isFile); proxyClientConfig.setAliveConnections(aliveConnectionNum); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java index 465180cb8f..2e61c6766b 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java @@ -20,6 +20,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.FetcherConstants; import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; import org.apache.inlong.common.enums.TaskStateEnum; import org.apache.inlong.common.pojo.agent.DataConfig; @@ -57,6 +58,7 @@ public AgentBaseTestsHelper setupAgentHome() { boolean result = testRootDir.toFile().mkdirs(); LOGGER.info("try to create {}, result is {}", testRootDir, result); AgentConfiguration.getAgentConf().set(AgentConstants.AGENT_HOME, testRootDir.toString()); + AgentConfiguration.getAgentConf().set(FetcherConstants.AGENT_MANAGER_ADDR, ""); return this; } @@ -79,14 +81,14 @@ public void teardownAgentHome() { } public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime, - TaskStateEnum state) { - DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state); + TaskStateEnum state, String cycleUnit) { + DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, cycleUnit); TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig); return profile; } private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime, - TaskStateEnum state) { + TaskStateEnum state, String cycleUnit) { DataConfig dataConfig = new DataConfig(); dataConfig.setInlongGroupId("testGroupId"); dataConfig.setInlongStreamId("testStreamId"); @@ -100,7 +102,7 @@ private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long // GMT-8:00 same with Asia/Shanghai fileTaskConfig.setTimeZone("GMT-8:00"); fileTaskConfig.setMaxFileCount(100); - fileTaskConfig.setCycleUnit("D"); + fileTaskConfig.setCycleUnit(cycleUnit); fileTaskConfig.setRetry(retry); fileTaskConfig.setStartTime(startTime); fileTaskConfig.setEndTime(endTime); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java index 4e8ce6b413..c350f0275a 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java @@ -70,7 +70,7 @@ public static void setup() { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D"); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 3a002064fb..4a2303363f 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -75,7 +75,7 @@ public static void setup() { private LogFileSource getSource(int taskId, long offset) { try { String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); + TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D"); String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); InstanceProfile instanceProfile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230928", AgentUtils.getCurrentTime()); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java index 29047919cc..70e316519a 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java @@ -79,7 +79,7 @@ public static void setup() { tempResourceName = LOADER.getResource("testScan/temp.txt").getPath(); File f = new File(tempResourceName); String pattern = f.getParent() + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L, 0L, TaskStateEnum.RUNNING); + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L, 0L, TaskStateEnum.RUNNING, "D"); try { String startStr = "2023-09-20 00:00:00"; String endStr = "2023-09-30 00:00:00"; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java index 10d4baac4d..9168c3a1f7 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java @@ -139,12 +139,16 @@ public ProxyClientConfig(String localHost, boolean requestByHttp, String manager /* pay attention to the last url parameter ip */ public ProxyClientConfig(String managerAddress, String inlongGroupId, String authSecretId, String authSecretKey, LoadBalance loadBalance, int virtualNode, int maxRetry) throws ProxysdkException { - if (Utils.isBlank(managerAddress)) { - throw new ProxysdkException("managerAddress is blank!"); + if (Utils.isBlank(managerAddress) || (!managerAddress.startsWith(ConfigConstants.HTTP) + && !managerAddress.startsWith(ConfigConstants.HTTPS))) { + throw new ProxysdkException("managerAddress is blank or missing http/https protocol "); } if (Utils.isBlank(inlongGroupId)) { throw new ProxysdkException("groupId is blank!"); } + if (managerAddress.startsWith(ConfigConstants.HTTPS)) { + this.requestByHttp = false; + } this.managerAddress = managerAddress; this.managerUrl = getManagerUrl(managerAddress, inlongGroupId); this.inlongGroupId = inlongGroupId; From eb15b7b2106ad173e3bdf246292dccbfa5b226f0 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Fri, 5 Jan 2024 10:12:22 +0800 Subject: [PATCH 3/3] [INLONG-9548][Agent] Supports HTTPS and can determine whether to enable it through configuration --- .../apache/inlong/agent/core/instance/TestInstanceManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java index 262565022e..34772636ad 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java @@ -118,6 +118,5 @@ public void testInstanceManager() { await().atMost(1, TimeUnit.SECONDS).until(() -> manager.getInstanceProfile(instanceId) == null); Assert.assertTrue(String.valueOf(instance.initTime), instance.initTime == MockInstance.INIT_TIME); Assert.assertTrue(instance.runtime > instance.initTime); - Assert.assertTrue(instance.destroyTime > instance.runtime); } }