Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9548][Agent] Supports HTTPS and can determine whether to enable it through configuration #9549

Merged
merged 4 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ public class FetcherConstants {
// default is 30s
public static final int DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT = 30;

// enable https
public static final String ENABLE_HTTPS = "enable.https";
public static final boolean DEFAULT_ENABLE_HTTPS = false;

// required config
public static final String AGENT_MANAGER_VIP_HTTP_HOST = "agent.manager.vip.http.host";
public static final String AGENT_MANAGER_VIP_HTTP_PORT = "agent.manager.vip.http.port";
public static final String AGENT_MANAGER_ADDR = "agent.manager.addr";

public static final String AGENT_MANAGER_VIP_HTTP_PATH = "agent.manager.vip.http.managerIp.path";
public static final String DEFAULT_AGENT_TDM_VIP_HTTP_PATH = "/agent/getManagerIpList";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
import lombok.Data;

import static java.util.Objects.requireNonNull;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static org.apache.inlong.agent.constant.TaskConstants.SYNC_SEND_OPEN;
import static org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_DATAPROXY;

Expand Down Expand Up @@ -376,8 +375,7 @@ private static Proxy getProxy(DataConfig dataConfigs) {
Proxy proxy = new Proxy();
Manager manager = new Manager();
AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
manager.setHost(agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST));
manager.setPort(agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT));
manager.setAddr(agentConf.get(AGENT_MANAGER_ADDR));
proxy.setInlongGroupId(dataConfigs.getInlongGroupId());
proxy.setInlongStreamId(dataConfigs.getInlongStreamId());
proxy.setManager(manager);
Expand Down Expand Up @@ -538,8 +536,7 @@ public static class Task {
@Data
public static class Manager {

private String port;
private String host;
private String addr;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,34 @@

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;

import java.nio.charset.Charset;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HTTP_APPLICATION_JSON;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HTTP_SUCCESS_CODE;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REQUEST_TIMEOUT;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
Expand All @@ -64,22 +71,29 @@ public class HttpManager {
private final CloseableHttpClient httpClient;
private final String secretId;
private final String secretKey;
private static boolean enableHttps;

public HttpManager(AgentConfiguration conf) {
httpClient = constructHttpClient(conf.getInt(AGENT_MANAGER_REQUEST_TIMEOUT,
DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT));
enableHttps = StringUtils.startsWith(agentConf.get(AGENT_MANAGER_ADDR), "https");
int timeout = conf.getInt(AGENT_MANAGER_REQUEST_TIMEOUT,
DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT);
if (enableHttps) {
httpClient = constructHttpsClient(timeout);
} else {
httpClient = constructHttpClient(timeout);
}
secretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
secretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
}

/**
* build base url for manager according to config
*
* example - http://127.0.0.1:8080/inlong/manager/openapi
* example(http) - http://127.0.0.1:8080/inlong/manager/openapi
* example(https) - https://127.0.0.1:8080/inlong/manager/openapi
*/
public static String buildBaseUrl() {
return "http://" + agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST)
+ ":" + agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT)
return agentConf.get(AGENT_MANAGER_ADDR)
+ agentConf.get(AGENT_MANAGER_VIP_HTTP_PREFIX_PATH, DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH);
}

Expand All @@ -102,6 +116,31 @@ private synchronized CloseableHttpClient constructHttpClient(int timeout) {
return httpClientBuilder.build();
}

/**
* construct https client
*
* @param timeout timeout setting
* @return closeable timeout
*/
private static CloseableHttpClient constructHttpsClient(int timeout) {
long timeoutInMs = TimeUnit.SECONDS.toMillis(timeout);
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout((int) timeoutInMs)
.setSocketTimeout((int) timeoutInMs).build();
SSLContext sslContext = null;
try {
sslContext = SSLContexts.custom().build();
} catch (NoSuchAlgorithmException e) {
LOGGER.error("constructHttpsClient error ", e);
} catch (KeyManagementException e) {
LOGGER.error("constructHttpsClient error ", e);
}
SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext,
new String[]{"TLSv1.2"}, null,
SSLConnectionSocketFactory.getDefaultHostnameVerifier());

return HttpClients.custom().setDefaultRequestConfig(requestConfig).setSSLSocketFactory(sslsf).build();
}

/**
* doPost
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.FetcherConstants;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
Expand Down Expand Up @@ -55,6 +56,7 @@ public AgentBaseTestsHelper setupAgentHome() {
boolean result = testRootDir.toFile().mkdirs();
LOGGER.info("try to create {}, result is {}", testRootDir, result);
AgentConfiguration.getAgentConf().set(AgentConstants.AGENT_HOME, testRootDir.toString());
AgentConfiguration.getAgentConf().set(FetcherConstants.AGENT_MANAGER_ADDR, "");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,5 @@ public void testInstanceManager() {
await().atMost(1, TimeUnit.SECONDS).until(() -> manager.getInstanceProfile(instanceId) == null);
Assert.assertTrue(String.valueOf(instance.initTime), instance.initTime == MockInstance.INIT_TIME);
Assert.assertTrue(instance.runtime > instance.initTime);
Assert.assertTrue(instance.destroyTime > instance.runtime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_UNIQ_ID;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_RETURN_PARAM_DATA;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_TASK_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH;
import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
Expand Down Expand Up @@ -94,7 +93,7 @@ public ManagerFetcher(AgentManager agentManager) {
}

private boolean requiredKeys(AgentConfiguration conf) {
return conf.hasKey(AGENT_MANAGER_VIP_HTTP_HOST) && conf.hasKey(AGENT_MANAGER_VIP_HTTP_PORT);
return conf.hasKey(AGENT_MANAGER_ADDR);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_PROXY_SEND;
import static org.apache.inlong.agent.constant.JobConstants.JOB_PROXY_SEND;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
Expand All @@ -79,11 +78,7 @@ public class SenderManager {
// in case of thread abusing.
private ThreadFactory SHARED_FACTORY;
private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
private final String managerHost;
private final int managerPort;
private final String netTag;
private final String localhost;
private final boolean isLocalVisit;
private final String managerAddr;
private final int totalAsyncBufSize;
private final int aliveConnectionNum;
private final boolean isCompress;
Expand All @@ -109,13 +104,8 @@ public class SenderManager {

public SenderManager(JobProfile jobConf, String inlongGroupId, String sourcePath) {
AgentConfiguration conf = AgentConfiguration.getAgentConf();
managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST);
managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT);
managerAddr = conf.get(AGENT_MANAGER_ADDR);
proxySend = jobConf.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND);
localhost = jobConf.get(CommonConstants.PROXY_LOCAL_HOST, CommonConstants.DEFAULT_PROXY_LOCALHOST);
netTag = jobConf.get(CommonConstants.PROXY_NET_TAG, CommonConstants.DEFAULT_PROXY_NET_TAG);
isLocalVisit = jobConf.getBoolean(
CommonConstants.PROXY_IS_LOCAL_VISIT, CommonConstants.DEFAULT_PROXY_IS_LOCAL_VISIT);
totalAsyncBufSize = jobConf
.getInt(
CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE,
Expand Down Expand Up @@ -199,9 +189,8 @@ private AgentMetricItem getMetricItem(String groupId, String streamId) {
* @return DefaultMessageSender
*/
private DefaultMessageSender createMessageSender(String tagName) throws Exception {

ProxyClientConfig proxyClientConfig = new ProxyClientConfig(
localhost, isLocalVisit, managerHost, managerPort, tagName, authSecretId, authSecretKey);
ProxyClientConfig proxyClientConfig = new ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setFile(isFile);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_PROXY_SEND;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_PROXY_SEND;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
Expand All @@ -81,11 +80,7 @@ public class SenderManager {
// in case of thread abusing.
private ThreadFactory SHARED_FACTORY;
private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
private final String managerHost;
private final int managerPort;
private final String netTag;
private final String localhost;
private final boolean isLocalVisit;
private final String managerAddr;
private final int totalAsyncBufSize;
private final int aliveConnectionNum;
private final boolean isCompress;
Expand All @@ -110,17 +105,12 @@ public class SenderManager {
protected InstanceProfile profile;
private volatile boolean resendRunning = false;
private volatile boolean started = false;
private static final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();

public SenderManager(InstanceProfile profile, String inlongGroupId, String sourcePath) {
AgentConfiguration conf = AgentConfiguration.getAgentConf();
this.profile = profile;
managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST);
managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT);
managerAddr = agentConf.get(AGENT_MANAGER_ADDR);
proxySend = profile.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND);
localhost = profile.get(CommonConstants.PROXY_LOCAL_HOST, CommonConstants.DEFAULT_PROXY_LOCALHOST);
netTag = profile.get(CommonConstants.PROXY_NET_TAG, CommonConstants.DEFAULT_PROXY_NET_TAG);
isLocalVisit = profile.getBoolean(
CommonConstants.PROXY_IS_LOCAL_VISIT, CommonConstants.DEFAULT_PROXY_IS_LOCAL_VISIT);
totalAsyncBufSize = profile
.getInt(
CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE,
Expand All @@ -145,8 +135,8 @@ public SenderManager(InstanceProfile profile, String inlongGroupId, String sourc
enableBusyWait = profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT);
batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
authSecretId = agentConf.get(AGENT_MANAGER_AUTH_SECRET_ID);
authSecretKey = agentConf.get(AGENT_MANAGER_AUTH_SECRET_KEY);

this.sourcePath = sourcePath;
this.inlongGroupId = inlongGroupId;
Expand Down Expand Up @@ -205,9 +195,8 @@ private AgentMetricItem getMetricItem(String groupId, String streamId) {
* @param tagName we use group id as tag name
*/
private void createMessageSender(String tagName) throws Exception {

ProxyClientConfig proxyClientConfig = new ProxyClientConfig(
localhost, isLocalVisit, managerHost, managerPort, tagName, authSecretId, authSecretKey);
ProxyClientConfig proxyClientConfig = new ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setFile(isFile);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.FetcherConstants;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
Expand Down Expand Up @@ -57,6 +58,7 @@ public AgentBaseTestsHelper setupAgentHome() {
boolean result = testRootDir.toFile().mkdirs();
LOGGER.info("try to create {}, result is {}", testRootDir, result);
AgentConfiguration.getAgentConf().set(AgentConstants.AGENT_HOME, testRootDir.toString());
AgentConfiguration.getAgentConf().set(FetcherConstants.AGENT_MANAGER_ADDR, "");
return this;
}

Expand All @@ -79,14 +81,14 @@ public void teardownAgentHome() {
}

public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
TaskStateEnum state) {
DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state);
TaskStateEnum state, String cycleUnit) {
DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, cycleUnit);
TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
return profile;
}

private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
TaskStateEnum state) {
TaskStateEnum state, String cycleUnit) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId");
dataConfig.setInlongStreamId("testStreamId");
Expand All @@ -100,7 +102,7 @@ private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long
// GMT-8:00 same with Asia/Shanghai
fileTaskConfig.setTimeZone("GMT-8:00");
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit("D");
fileTaskConfig.setCycleUnit(cycleUnit);
fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static void setup() {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING);
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D");
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime());
}
Expand Down
Loading