From 966b51b26acd3684e2284262079996e82a00279b Mon Sep 17 00:00:00 2001 From: zh378814 Date: Thu, 1 Dec 2022 10:36:56 +0800 Subject: [PATCH 1/3] update http --- connectors/rocketmq-connect-http/README.md | 38 ++- connectors/rocketmq-connect-http/pom.xml | 49 +++- .../connect/http/sink/HttpSinkConnector.java | 94 ++++-- .../connect/http/sink/HttpSinkTask.java | 206 +++++++++++-- .../http/sink/auth/AbstractHttpClient.java | 26 ++ .../http/sink/auth/ApacheHttpClientImpl.java | 263 +++++++++++++++++ .../connect/http/sink/auth/ApiKeyImpl.java | 26 ++ .../rocketmq/connect/http/sink/auth/Auth.java | 16 + .../connect/http/sink/auth/BasicAuthImpl.java | 31 ++ .../connect/http/sink/auth/HttpCallback.java | 57 ++++ .../http/sink/auth/HttpRequestCallable.java | 91 ++++++ .../http/sink/auth/OAuthClientImpl.java | 148 ++++++++++ .../http/sink/auth/SocksProxyConfig.java | 38 +++ .../auth/ThreadLocalProxyAuthenticator.java | 30 ++ .../connect/http/sink/common/OkHttpUtils.java | 274 ------------------ .../http/sink/constant/AuthTypeEnum.java | 27 ++ .../http/sink/constant/HttpConstant.java | 31 +- .../http/sink/entity/ClientConfig.java | 206 +++++++++++++ .../connect/http/sink/entity/HttpRequest.java | 52 ++++ .../connect/http/sink/entity/OAuthEntity.java | 111 +++++++ .../connect/http/sink/entity/TokenEntity.java | 124 ++++++++ .../connect/http/sink/util/CheckUtils.java | 33 +++ .../connect/http/sink/util/JsonUtils.java | 76 +++++ .../http/sink/HttpSinkConnectorTest.java | 21 -- .../connect/http/sink/HttpSinkTaskTest.java | 82 ++++++ 25 files changed, 1785 insertions(+), 365 deletions(-) create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/AbstractHttpClient.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApacheHttpClientImpl.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApiKeyImpl.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/Auth.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/BasicAuthImpl.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpCallback.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpRequestCallable.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/OAuthClientImpl.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/SocksProxyConfig.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ThreadLocalProxyAuthenticator.java delete mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/common/OkHttpUtils.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/AuthTypeEnum.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/ClientConfig.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/HttpRequest.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/OAuthEntity.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/TokenEntity.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/CheckUtils.java create mode 100644 connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/JsonUtils.java create mode 100644 connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkTaskTest.java diff --git a/connectors/rocketmq-connect-http/README.md b/connectors/rocketmq-connect-http/README.md index 8b6d5c70..076c4e4f 100644 --- a/connectors/rocketmq-connect-http/README.md +++ b/connectors/rocketmq-connect-http/README.md @@ -15,13 +15,18 @@ mvn clean install -Dmaven.test.skip=true ``` http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-sink-connector-name} -?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "${connect-topicname}","url":"${url}"} +?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"HttpSinkConnector", +"urlPattern":"${urlPattern}","method":"${method}","queryStringParameters":"${queryStringParameters}","headerParameters":"${headerParameters}","bodys":"${bodys}","authType":"${authType}","basicUser":"${basicUser}","basicPassword":"${basicPassword}", +"oauth2Endpoint":"${oauth2Endpoint}","oauth2ClientId":"${oauth2ClientId}","oauth2ClientSecret":"${oauth2ClientSecret}","oauth2HttpMethod":"${oauth2HttpMethod}","proxyType":"${proxyType}","proxyHost":"${proxyHost}","proxyPort":"${proxyPort}","proxyUser":"${proxyUser}", +"proxyPort":"${proxyPort}","proxyPort":"${proxyPort}","proxyUser":"${proxyUser}","proxyPassword":"${proxyPassword}","apiKeyName":"${apiKeyName}","apiKeyValue":"${apiKeyValue}","timeout":"${timeout}"} ``` -例子 +例子 ``` http://localhost:8081/connectors/httpConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", -"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "http-topic","url":"192.168.1.2"} +"connector-class":"HttpSinkConnector","urlPattern":"http://127.0.0.1","method":"POST","queryStringParameters":"","headerParameters":"","bodys":"{"id" : "234"}","authType":"BASIC_AUTH","basicUser":"","basicPassword":"", +"oauth2Endpoint":"","oauth2ClientId":"","oauth2ClientSecret":"","oauth2HttpMethod":"","proxyType":"","proxyHost":"","proxyPort":"","proxyUser":"", +"proxyPort":"","proxyPort":"","proxyUser":"","proxyPassword":"","apiKeyName":"","apiKeyValue":"","timeout":"6000"} ``` >**注:** `rocketmq-http-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中 @@ -35,8 +40,25 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-connector-name}/ ## rocketmq-connect-http 参数说明 * **http-sink-connector 参数说明** -| KEY | TYPE | Must be filled | Description | Example -|-----|---------|----------------|-------------|------------------| -| url | String | YES | sink端 域名地址 | http://127.0.0.1 | -|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx | - +| KEY | TYPE | Must be filled | Description | Example +|-----------------------|---------|----------------|----------------|------------------| +| urlPattern | String | YES | sink端 域名地址 | http://127.0.0.1 | +| method | String | YES | 请求类型 | POST、GET | +| queryStringParameters | String | NO | 请求参数 | xxxx | +| headerParameters | String | NO | 请求头 | xxxx | +| bodys | String | NO | 请求体 | xxxx | +| authType | String | NO | 权限类型 | BASIC_AUTH、OAUTH_AUTH、API_KEY_AUTH | +| basicUser | String | NO | 用户名 | xxxx | +| basicPassword | String | NO | 密码 | xxxx | +| oauth2Endpoint | String | NO | OAuth获取token地址 | http://127.0.0.1 | +| oauth2ClientId | String | NO | clientId | xxxx | +| oauth2ClientSecret | String | NO | client secret | xxxx | +| oauth2HttpMethod | String | NO | oauth的请求类型 | xxxx | +| proxyType | String | NO | 代理类型 | xxxx | +| proxyHost | String | NO | 代理地址 | xxxx | +| proxyPort | String | NO | 代理端口 | xxxx | +| proxyUser | String | NO | 代理的访问的用户名 | xxxx | +| proxyPassword | String | NO | 代理访问的密码 | xxxx | +| apiKeyName | String | NO | auth api key | xxxx | +| apiKeyValue | String | NO | auth api value | xxxx | +| timeout | String | NO | 超时时间 | xxxx | diff --git a/connectors/rocketmq-connect-http/pom.xml b/connectors/rocketmq-connect-http/pom.xml index 7bbadfe0..f0b655e8 100644 --- a/connectors/rocketmq-connect-http/pom.xml +++ b/connectors/rocketmq-connect-http/pom.xml @@ -14,15 +14,18 @@ 8 1.7.7 1.2.9 - 2.9.0 - 4.13.1 - 2.6.0 - 2.6.3 - 0.1.2-SNAPSHOT + 2.10 + 4.13.2 + 3.23.1 + 4.8.0 + 0.1.5-SNAPSHOT 3.9.1 - 1.2.83 + 2.0.19 3.12.0 UTF-8 + 31.1-jre + 4.4.1 + 4.1.85.Final @@ -183,9 +186,15 @@ test - com.squareup.okhttp3 - okhttp - ${okhttp.version} + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + com.google.code.gson + gson + ${gson.version} + compile com.alibaba @@ -193,9 +202,25 @@ ${fastjson.version} - org.apache.commons - commons-lang3 - ${commons-lang3.version} + com.google.guava + guava + ${guava.version} + + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + org.apache.httpcomponents + httpcore + ${httpclient.version} + + + io.netty + netty-all + ${netty-all.version} + compile diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java index 54c7efe5..c143faca 100644 --- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java @@ -1,36 +1,63 @@ package org.apache.rocketmq.connect.http.sink; -import org.apache.rocketmq.connect.http.sink.constant.HttpConstant; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.Task; import io.openmessaging.connector.api.component.task.sink.SinkConnector; import io.openmessaging.internal.DefaultKeyValue; -import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.http.sink.constant.HttpConstant; +import org.apache.rocketmq.connect.http.sink.util.CheckUtils; -import java.net.URL; -import java.net.URLConnection; import java.util.ArrayList; import java.util.List; public class HttpSinkConnector extends SinkConnector { - private String url; - - @Override - public void pause() { - - } + protected String urlPattern; + protected String method; + protected String queryStringParameters; + protected String headerParameters; + protected String bodys; + protected String authType; + protected String basicUser; + protected String basicPassword; + protected String oauth2Endpoint; + protected String oauth2ClientId; + protected String oauth2ClientSecret; + protected String oauth2HttpMethod; + protected String proxyType; + protected String proxyHost; + protected String proxyPort; + protected String proxyUser; + protected String proxyPassword; + protected String timeout; + protected String apiKeyName; + protected String apiKeyValue; - @Override - public void resume() { - - } @Override public List taskConfigs(int maxTasks) { List keyValueList = new ArrayList<>(11); KeyValue keyValue = new DefaultKeyValue(); - keyValue.put(HttpConstant.URL_CONSTANT, url); + keyValue.put(HttpConstant.URL_PATTERN_CONSTANT, urlPattern); + keyValue.put(HttpConstant.METHOD_CONSTANT, method); + keyValue.put(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT, queryStringParameters); + keyValue.put(HttpConstant.HEADER_PARAMETERS_CONSTANT, headerParameters); + keyValue.put(HttpConstant.BODYS_CONSTANT, bodys); + keyValue.put(HttpConstant.AUTH_TYPE_CONSTANT, authType); + keyValue.put(HttpConstant.BASIC_USER_CONSTANT, basicUser); + keyValue.put(HttpConstant.BASIC_PASSWORD_CONSTANT, basicPassword); + keyValue.put(HttpConstant.OAUTH2_ENDPOINT_CONSTANT, oauth2Endpoint); + keyValue.put(HttpConstant.OAUTH2_CLIENTID_CONSTANT, oauth2ClientId); + keyValue.put(HttpConstant.OAUTH2_CLIENTSECRET_CONSTANT, oauth2ClientSecret); + keyValue.put(HttpConstant.OAUTH2_HTTP_METHOD_CONSTANT, oauth2HttpMethod); + keyValue.put(HttpConstant.PROXY_TYPE_CONSTANT, proxyType); + keyValue.put(HttpConstant.PROXY_HOST_CONSTANT, proxyHost); + keyValue.put(HttpConstant.PROXY_PORT_CONSTANT, proxyPort); + keyValue.put(HttpConstant.PROXY_USER_CONSTANT, proxyUser); + keyValue.put(HttpConstant.PROXY_PASSWORD_CONSTANT, proxyPassword); + keyValue.put(HttpConstant.TIMEOUT_CONSTANT, timeout); + keyValue.put(HttpConstant.API_KEY_NAME, apiKeyName); + keyValue.put(HttpConstant.API_KEY_VALUE, apiKeyValue); keyValueList.add(keyValue); return keyValueList; } @@ -42,24 +69,35 @@ public Class taskClass() { @Override public void validate(KeyValue config) { - if (StringUtils.isBlank(config.getString(HttpConstant.URL_CONSTANT))) { - throw new RuntimeException("http required parameter is null !"); - } - try { - URL urlConnect = new URL(config.getString(HttpConstant.URL_CONSTANT)); - URLConnection urlConnection = urlConnect.openConnection(); - urlConnection.setConnectTimeout(5000); - urlConnection.connect(); - } catch (Exception e) { - throw new RuntimeException(e.getMessage()); - } } @Override - public void init(KeyValue config) { - url = config.getString(HttpConstant.URL_CONSTANT); + public void start(KeyValue config) { + urlPattern = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.URL_PATTERN_CONSTANT)); + method = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.METHOD_CONSTANT)); + queryStringParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT)); + headerParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.HEADER_PARAMETERS_CONSTANT)); + bodys = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BODYS_CONSTANT)); + authType = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.AUTH_TYPE_CONSTANT)); + basicUser = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_USER_CONSTANT)); + basicPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_PASSWORD_CONSTANT)); + oauth2Endpoint = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_ENDPOINT_CONSTANT)); + oauth2ClientId = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_CLIENTID_CONSTANT)); + oauth2ClientSecret = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_CLIENTSECRET_CONSTANT)); + oauth2HttpMethod = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_HTTP_METHOD_CONSTANT)); + proxyType = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_TYPE_CONSTANT)); + proxyHost = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_HOST_CONSTANT)); + proxyPort = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_PORT_CONSTANT)); + proxyUser = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_USER_CONSTANT)); + proxyPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_PASSWORD_CONSTANT)); + timeout = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.TIMEOUT_CONSTANT)); + apiKeyName = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_NAME)); + apiKeyValue = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_VALUE)); + queryStringParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT)); + headerParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.HEADER_PARAMETERS_CONSTANT)); } + @Override public void stop() { diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java index 603bafaf..70841509 100644 --- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java @@ -1,61 +1,227 @@ package org.apache.rocketmq.connect.http.sink; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Maps; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.sink.SinkTask; -import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.errors.ConnectException; -import org.apache.rocketmq.connect.http.sink.common.OkHttpUtils; +import io.openmessaging.connector.api.errors.RetriableException; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.http.sink.auth.AbstractHttpClient; +import org.apache.rocketmq.connect.http.sink.auth.ApacheHttpClientImpl; +import org.apache.rocketmq.connect.http.sink.auth.ApiKeyImpl; +import org.apache.rocketmq.connect.http.sink.auth.BasicAuthImpl; +import org.apache.rocketmq.connect.http.sink.auth.HttpCallback; +import org.apache.rocketmq.connect.http.sink.auth.OAuthClientImpl; +import org.apache.rocketmq.connect.http.sink.constant.AuthTypeEnum; import org.apache.rocketmq.connect.http.sink.constant.HttpConstant; +import org.apache.rocketmq.connect.http.sink.entity.ClientConfig; +import org.apache.rocketmq.connect.http.sink.entity.HttpRequest; +import org.apache.rocketmq.connect.http.sink.util.CheckUtils; +import org.apache.rocketmq.connect.http.sink.util.JsonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class HttpSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(HttpSinkTask.class); + private static final int DEFAULT_CONSUMER_TIMEOUT_SECONDS = 30; - private String url; + protected ScheduledExecutorService scheduledExecutorService; + protected String urlPattern; + protected String method; + protected String queryStringParameters; + protected String headerParameters; + protected String bodys; + protected String authType; + protected String basicUser; + protected String basicPassword; + protected String oauth2Endpoint; + protected String oauth2ClientId; + protected String oauth2ClientSecret; + protected String oauth2HttpMethod; + protected String proxyType; + protected String proxyHost; + protected String proxyPort; + protected String proxyUser; + protected String proxyPassword; + protected String apiKeyName; + protected String apiKeyValue; + protected String timeout; + + private AbstractHttpClient httpClient; + + private OAuthClientImpl oAuthClient; + + private BasicAuthImpl basicAuth; + + private ApiKeyImpl apiKey; @Override public void put(List sinkRecords) throws ConnectException { try { - sinkRecords.forEach(connectRecord -> OkHttpUtils.builder() - .url(url) - .addParam(HttpConstant.DATA_CONSTANT, connectRecord.getData().toString()) - .post(true) - .sync()); + CountDownLatch countDownLatch = new CountDownLatch(sinkRecords.size()); + HttpCallback httpCallback = new HttpCallback(countDownLatch); + for (ConnectRecord connectRecord : sinkRecords) { + ClientConfig clientConfig = getClientConfig(connectRecord); + Map headerMap = Maps.newHashMap(); + addHeaderMap(headerMap, clientConfig); + if (StringUtils.isNotBlank(clientConfig.getAuthType())) { + headerMap.putAll(auth(clientConfig)); + } + HttpRequest httpRequest = new HttpRequest(); + httpRequest.setBody(clientConfig.getBodys()); + httpRequest.setHeaderMap(headerMap); + httpRequest.setMethod(clientConfig.getMethod()); + httpRequest.setTimeout(clientConfig.getTimeout()); + httpRequest.setUrl(JsonUtils.queryStringAndPathValue(clientConfig.getUrlPattern(), clientConfig.getQueryStringParameters(), connectRecord.getExtension(HttpConstant.HTTP_PATH_VALUE))); + httpClient.execute(httpRequest, httpCallback); + } + boolean consumeSucceed = Boolean.FALSE; + try { + consumeSucceed = countDownLatch.await(DEFAULT_CONSUMER_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (Throwable e) { + log.error("count down latch failed.", e); + } + if (!consumeSucceed) { + throw new RetriableException("Request Timeout"); + } + if (httpCallback.isFailed()) { + throw new RetriableException(httpCallback.getMsg()); + } } catch (Exception e) { log.error("HttpSinkTask | put | error => ", e); + throw new RuntimeException(e); } } - @Override - public void pause() { - + private ClientConfig getClientConfig(ConnectRecord connectRecord) { + ClientConfig clientConfig = new ClientConfig(); + clientConfig.setHttpClient(httpClient); + clientConfig.setUrlPattern(urlPattern); + clientConfig.setMethod(CheckUtils.checkNull(method) ? connectRecord.getExtension(HttpConstant.HTTP_METHOD) : method); + clientConfig.setAuthType(authType); + clientConfig.setHttpPathValue(connectRecord.getExtension(HttpConstant.HTTP_PATH_VALUE)); + clientConfig.setQueryStringParameters(JsonUtils.mergeJson(JSONObject.parseObject(connectRecord.getExtension(HttpConstant.HTTP_QUERY_VALUE)), JSONObject.parseObject(queryStringParameters)) == null ? null : JsonUtils.mergeJson(JSONObject.parseObject(connectRecord.getExtension(HttpConstant.HTTP_QUERY_VALUE)), JSONObject.parseObject(queryStringParameters)).toJSONString()); + clientConfig.setHeaderParameters(JsonUtils.mergeJson(JSONObject.parseObject(connectRecord.getExtension(HttpConstant.HTTP_HEADER)), JSONObject.parseObject(headerParameters)) == null ? null : JsonUtils.mergeJson(JSONObject.parseObject(connectRecord.getExtension(HttpConstant.HTTP_HEADER)), JSONObject.parseObject(headerParameters)).toJSONString()); + clientConfig.setBodys(bodys); + clientConfig.setProxyUser(proxyUser); + clientConfig.setProxyPassword(proxyPassword); + clientConfig.setProxyType(proxyType); + clientConfig.setProxyPort(proxyPort); + clientConfig.setProxyHost(proxyHost); + clientConfig.setOauth2ClientId(oauth2ClientId); + clientConfig.setOauth2ClientSecret(oauth2ClientSecret); + clientConfig.setTimeout(timeout); + clientConfig.setOauth2HttpMethod(oauth2HttpMethod); + clientConfig.setOauth2Endpoint(oauth2Endpoint); + clientConfig.setBasicUser(basicUser); + clientConfig.setBasicPassword(basicPassword); + clientConfig.setApiKeyName(apiKeyName); + clientConfig.setApiKeyValue(apiKeyValue); + return clientConfig; } - @Override - public void resume() { - + private void addHeaderMap(Map headerMap, ClientConfig clientConfig) { + String header = clientConfig.getHeaderParameters(); + if (StringUtils.isBlank(header)) { + return; + } + JSONObject jsonObject = JSONObject.parseObject(header); + for (Map.Entry entry : jsonObject.entrySet()) { + if (entry.getValue() instanceof JSONObject) { + headerMap.put(entry.getKey(), ((JSONObject) entry.getValue()).toJSONString()); + } else { + headerMap.put(entry.getKey(), (String) entry.getValue()); + } + } } @Override public void validate(KeyValue config) { + if (CheckUtils.checkNull(config.getString(HttpConstant.URL_PATTERN_CONSTANT)) + || CheckUtils.checkNull(config.getString(HttpConstant.AUTH_TYPE_CONSTANT)) + || CheckUtils.checkNull(config.getString(HttpConstant.BODYS_CONSTANT))) { + throw new RuntimeException("http required parameter is null !"); + } + final List collect = Arrays.stream(AuthTypeEnum.values()).filter(authTypeEnum -> authTypeEnum.getAuthType().equals(config.getString(HttpConstant.AUTH_TYPE_CONSTANT))).collect(Collectors.toList()); + if (collect.isEmpty()) { + throw new RuntimeException("authType required parameter check is fail !"); + } } @Override - public void init(KeyValue config) { - url = config.getString(HttpConstant.URL_CONSTANT); + public void start(KeyValue config) { + urlPattern = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.URL_PATTERN_CONSTANT)); + method = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.METHOD_CONSTANT)); + bodys = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BODYS_CONSTANT)); + authType = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.AUTH_TYPE_CONSTANT)); + basicUser = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_USER_CONSTANT)); + basicPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_PASSWORD_CONSTANT)); + oauth2Endpoint = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_ENDPOINT_CONSTANT)); + oauth2ClientId = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_CLIENTID_CONSTANT)); + oauth2ClientSecret = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_CLIENTSECRET_CONSTANT)); + oauth2HttpMethod = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_HTTP_METHOD_CONSTANT)); + proxyType = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_TYPE_CONSTANT)); + proxyHost = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_HOST_CONSTANT)); + proxyPort = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_PORT_CONSTANT)); + proxyUser = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_USER_CONSTANT)); + proxyPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_PASSWORD_CONSTANT)); + timeout = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.TIMEOUT_CONSTANT)); + apiKeyName = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_NAME)); + apiKeyValue = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_VALUE)); + queryStringParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT)); + headerParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.HEADER_PARAMETERS_CONSTANT)); + try { + httpClient = new ApacheHttpClientImpl(); + ClientConfig proxyConfig = new ClientConfig(); + proxyConfig.setProxyHost(proxyHost); + proxyConfig.setProxyPort(proxyPort); + proxyConfig.setProxyType(proxyType); + proxyConfig.setProxyUser(proxyUser); + proxyConfig.setProxyPassword(proxyPassword); + httpClient.init(proxyConfig); + oAuthClient = new OAuthClientImpl(); + oAuthClient.setOauthMap(new ConcurrentHashMap<>(16)); + scheduledExecutorService.scheduleAtFixedRate(oAuthClient, 1, 1, TimeUnit.SECONDS); + basicAuth = new BasicAuthImpl(); + apiKey = new ApiKeyImpl(); + } catch (Exception e) { + log.error("HttpSinkTask | start | error => ", e); + throw new RuntimeException(e); + } } @Override - public void start(SinkTaskContext sinkTaskContext) { - super.start(sinkTaskContext); + public void stop() { + httpClient.close(); } - @Override - public void stop() { + private Map auth(ClientConfig config) { + switch (config.getAuthType()) { + case "BASIC_AUTH": + return basicAuth.auth(config); + case "OAUTH_AUTH": + return oAuthClient.auth(config); + case "API_KEY_AUTH": + return apiKey.auth(config); + default: + break; + } + return new HashMap<>(16); + } + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = scheduledExecutorService; } } diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/AbstractHttpClient.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/AbstractHttpClient.java new file mode 100644 index 00000000..7c996f54 --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/AbstractHttpClient.java @@ -0,0 +1,26 @@ +package org.apache.rocketmq.connect.http.sink.auth; + + +import org.apache.rocketmq.connect.http.sink.entity.ClientConfig; +import org.apache.rocketmq.connect.http.sink.entity.HttpRequest; + +import java.io.IOException; + +public interface AbstractHttpClient { + + /** + * + * @param config + */ + void init(ClientConfig config); + + /** + * + * @return + * @throws IOException + */ + String execute(HttpRequest httpRequest, HttpCallback httpCallback) throws Exception; + + void close(); + +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApacheHttpClientImpl.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApacheHttpClientImpl.java new file mode 100644 index 00000000..c4b1c4b7 --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApacheHttpClientImpl.java @@ -0,0 +1,263 @@ +package org.apache.rocketmq.connect.http.sink.auth; + +import com.google.common.net.MediaType; +import io.netty.util.concurrent.DefaultThreadFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpHead; +import org.apache.http.client.methods.HttpOptions; +import org.apache.http.client.methods.HttpPatch; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.methods.HttpTrace; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.DnsResolver; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.TrustStrategy; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.HTTP; +import org.apache.http.protocol.HttpContext; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.rocketmq.connect.http.sink.entity.ClientConfig; +import org.apache.rocketmq.connect.http.sink.entity.HttpRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.Socket; +import java.net.UnknownHostException; +import java.security.cert.X509Certificate; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.rocketmq.connect.http.sink.constant.HttpConstant.LOG_SIFT_TAG; + + +public class ApacheHttpClientImpl implements AbstractHttpClient { + private static final Logger log = LoggerFactory.getLogger(ApacheHttpClientImpl.class); + + private static ExecutorService executorServicePool = new ThreadPoolExecutor(200, 2000, 600, TimeUnit.SECONDS, + new LinkedBlockingDeque(1000), new DefaultThreadFactory("ApacheHttpClientRequestThread")); + private CloseableHttpClient httpClient = null; + + private SocksProxyConfig socksProxyConfig; + private static final String SOCKS_ADDRESS_KEY = "socks.address"; + + @Override + public void init(ClientConfig config) { + try { + SSLContextBuilder sslContextBuilder = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() { + @Override + public boolean isTrusted(X509Certificate[] chain, String authType) { + return true; + } + }); + Registry reg = RegistryBuilder.create().register("http", + new SocksPlainConnectionSocketFactory()) + .register("https", new SocksSSLConnectionSocketFactory(sslContextBuilder.build())) + .build(); + PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(reg, + new FakeDnsResolver()); + connManager.setMaxTotal(400); + connManager.setDefaultMaxPerRoute(500); + httpClient = HttpClients.custom() + .setConnectionManager(connManager) + .build(); + + this.socksProxyConfig = new SocksProxyConfig(config.getProxyHost(), config.getProxyUser(), config.getProxyPassword()); + } catch (Exception e) { + log.error("ApacheHttpClientImpl | init | error => ", e); + throw new RuntimeException(e); + } + } + + @Override + public String execute(HttpRequest httpRequest, HttpCallback httpCallback) throws Exception { + CloseableHttpResponse response; + HttpRequestBase httpRequestBase = null; + if (httpRequest != null) { + httpRequestBase = extracted(httpRequest.getUrl(), httpRequest.getMethod(), httpRequest.getHeaderMap(), httpRequest.getBody()); + if (StringUtils.isNotBlank(httpRequest.getTimeout())) { + final RequestConfig requestConfig = RequestConfig.custom(). + setConnectionRequestTimeout(Integer.parseInt(httpRequest.getTimeout())). + setSocketTimeout(Integer.parseInt(httpRequest.getTimeout())). + setConnectTimeout(Integer.parseInt(httpRequest.getTimeout())).build(); + httpRequestBase.setConfig(requestConfig); + } + } + HttpRequestCallable httpRequestCallable = new HttpRequestCallable(httpClient, httpRequestBase, + HttpClientContext.create(), this.socksProxyConfig, httpCallback, MDC.get(LOG_SIFT_TAG)); + Future submit = executorServicePool.submit(httpRequestCallable); + String result = submit.get(); + log.info("ApacheHttpClientImpl | execute| success | result : {}", result); + return result; + } + + private HttpRequestBase extracted(String url, String method, Map headerMap, String body) throws UnsupportedEncodingException { + switch (method) { + case "GET": + HttpGet httpGet = new HttpGet(url); + headerMap.forEach(httpGet::addHeader); + httpGet.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString()); + return httpGet; + case "POST": + HttpPost httpPost = new HttpPost(url); + headerMap.forEach(httpPost::addHeader); + httpPost.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString()); + if (StringUtils.isNotBlank(body)) { + HttpEntity entityPot = new StringEntity(body); + httpPost.setEntity(entityPot); + } + return httpPost; + case "DELETE": + HttpDelete httpDelete = new HttpDelete(url); + headerMap.forEach(httpDelete::addHeader); + httpDelete.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString()); + return httpDelete; + case "PUT": + HttpPut httpPut = new HttpPut(url); + headerMap.forEach(httpPut::addHeader); + httpPut.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString()); + if (StringUtils.isNotBlank(body)) { + HttpEntity entityPot = new StringEntity(body); + httpPut.setEntity(entityPot); + } + return httpPut; + case "HEAD": + HttpHead httpHead = new HttpHead(url); + headerMap.forEach(httpHead::addHeader); + httpHead.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString()); + return httpHead; + case "TRACE": + HttpTrace httpTrace = new HttpTrace(url); + headerMap.forEach(httpTrace::addHeader); + httpTrace.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString()); + break; + case "PATCH": + HttpPatch httpPatch = new HttpPatch(url); + headerMap.forEach(httpPatch::addHeader); + httpPatch.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString()); + if (StringUtils.isNotBlank(body)) { + HttpEntity entityPot = new StringEntity(body); + httpPatch.setEntity(entityPot); + } + return httpPatch; + default: + } + HttpOptions httpOptions = new HttpOptions(url); + headerMap.forEach(httpOptions::addHeader); + httpOptions.addHeader(HTTP.CONTENT_TYPE, MediaType.JSON_UTF_8.toString()); + return httpOptions; + } + + @Override + public void close() { + try { + httpClient.close(); + } catch (IOException e) { + log.error("ApacheHttpClientImpl | close | error => ", e); + } + } + + /** + * 实现 http 链接的socket 工厂 + */ + static class SocksPlainConnectionSocketFactory extends PlainConnectionSocketFactory { + + @Override + public Socket createSocket(final HttpContext context) throws IOException { + InetSocketAddress socksaddr = (InetSocketAddress)context.getAttribute(SOCKS_ADDRESS_KEY); + if (socksaddr != null) { + Proxy proxy = new Proxy(Proxy.Type.SOCKS, socksaddr); + return new Socket(proxy); + } else { + return new Socket(); + } + } + + @Override + public Socket connectSocket(int connectTimeout, Socket socket, HttpHost host, InetSocketAddress remoteAddress, + InetSocketAddress localAddress, HttpContext context) throws IOException { + InetSocketAddress socksaddr = (InetSocketAddress)context.getAttribute(SOCKS_ADDRESS_KEY); + if (socksaddr != null) {//make proxy server to resolve host in http url + remoteAddress = InetSocketAddress.createUnresolved(host.getHostName(), host.getPort()); + } + return super.connectSocket(connectTimeout, socket, host, remoteAddress, localAddress, context); + } + } + + static class FakeDnsResolver implements DnsResolver { + @Override + public InetAddress[] resolve(String host) throws UnknownHostException { + // Return some fake DNS record for every request, we won't be using it + try { + return new InetAddress[] {InetAddress.getByName(host)}; + } catch (Throwable e) { + return new InetAddress[] {InetAddress.getByAddress(new byte[] {0, 0, 0, 0})}; + } + } + } + + /** + * 实现 https 链接的socket 工厂 + */ + static class SocksSSLConnectionSocketFactory extends SSLConnectionSocketFactory { + public SocksSSLConnectionSocketFactory(SSLContext sslContext) { + super(sslContext, NoopHostnameVerifier.INSTANCE); + } + + @Override + public Socket createSocket(final HttpContext context) throws IOException { + InetSocketAddress socksaddr = (InetSocketAddress)context.getAttribute(SOCKS_ADDRESS_KEY); + if (socksaddr != null) { + Proxy proxy = new Proxy(Proxy.Type.SOCKS, socksaddr); + return new Socket(proxy); + } else { + return new Socket(); + } + } + + @Override + public Socket connectSocket(int connectTimeout, Socket socket, HttpHost host, InetSocketAddress remoteAddress, + InetSocketAddress localAddress, HttpContext context) throws IOException { + InetSocketAddress socksaddr = (InetSocketAddress)context.getAttribute(SOCKS_ADDRESS_KEY); + if (socksaddr != null) { + remoteAddress = InetSocketAddress.createUnresolved(host.getHostName(), host.getPort()); + } + return super.connectSocket(connectTimeout, socket, host, remoteAddress, localAddress, context); + } + } + + static class NoopHostnameVerifier implements javax.net.ssl.HostnameVerifier { + public static final NoopHostnameVerifier INSTANCE = new NoopHostnameVerifier(); + + @Override + public boolean verify(String s, SSLSession sslSession) { + return true; + } + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApiKeyImpl.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApiKeyImpl.java new file mode 100644 index 00000000..9ead7bdf --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ApiKeyImpl.java @@ -0,0 +1,26 @@ +package org.apache.rocketmq.connect.http.sink.auth; + +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.http.sink.entity.ClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class ApiKeyImpl implements Auth { + private static final Logger log = LoggerFactory.getLogger(ApiKeyImpl.class); + + @Override + public Map auth(ClientConfig config) { + Map headMap = Maps.newHashMap(); + try { + if (StringUtils.isNotBlank(config.getApiKeyName()) && StringUtils.isNotBlank(config.getApiKeyValue())) { + headMap.put(config.getApiKeyName(), config.getApiKeyValue()); + } + } catch (Exception e) { + log.error("ApiKeyImpl | auth | error => ", e); + } + return headMap; + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/Auth.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/Auth.java new file mode 100644 index 00000000..54b4970d --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/Auth.java @@ -0,0 +1,16 @@ +package org.apache.rocketmq.connect.http.sink.auth; + + +import org.apache.rocketmq.connect.http.sink.entity.ClientConfig; + +import java.util.Map; + +public interface Auth { + + /** + * Authentication abstract method + * @param config + * @return + */ + Map auth(ClientConfig config); +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/BasicAuthImpl.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/BasicAuthImpl.java new file mode 100644 index 00000000..4cdbfea4 --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/BasicAuthImpl.java @@ -0,0 +1,31 @@ +package org.apache.rocketmq.connect.http.sink.auth; + +import com.google.common.collect.Maps; +import com.sun.org.apache.xerces.internal.impl.dv.util.Base64; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.http.sink.constant.HttpConstant; +import org.apache.rocketmq.connect.http.sink.entity.ClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class BasicAuthImpl implements Auth { + private static final Logger log = LoggerFactory.getLogger(BasicAuthImpl.class); + + + @Override + public Map auth(ClientConfig config) { + Map headMap = Maps.newHashMap(); + try { + if (StringUtils.isNotBlank(config.getBasicUser()) && StringUtils.isNotBlank(config.getBasicPassword())) { + String authorizationValue = config.getBasicUser() + ":" + config.getBasicPassword(); + headMap.put(HttpConstant.AUTHORIZATION, "Basic " + Base64.encode(authorizationValue.getBytes(StandardCharsets.UTF_8))); + } + } catch (Exception e) { + log.error("BasicAuthImpl | auth | error => ", e); + } + return headMap; + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpCallback.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpCallback.java new file mode 100644 index 00000000..800d63f5 --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpCallback.java @@ -0,0 +1,57 @@ +package org.apache.rocketmq.connect.http.sink.auth; + +import org.apache.http.concurrent.FutureCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; + +public class HttpCallback implements FutureCallback { + + private static final Logger log = LoggerFactory.getLogger(HttpCallback.class); + + private CountDownLatch countDownLatch; + + private boolean isFailed; + + private String msg; + + public HttpCallback(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void completed(String s) { + countDownLatch.countDown(); + } + + public void failed(final Exception ex) { + countDownLatch.countDown(); + isFailed = true; + log.error("http request failed.", ex); + } + + public void cancelled() { + countDownLatch.countDown(); + } + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + public boolean isFailed() { + return isFailed; + } + + public void setFailed(boolean failed) { + isFailed = failed; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpRequestCallable.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpRequestCallable.java new file mode 100644 index 00000000..2b19fc84 --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpRequestCallable.java @@ -0,0 +1,91 @@ +package org.apache.rocketmq.connect.http.sink.auth; + +import com.google.common.base.Strings; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.text.MessageFormat; +import java.util.concurrent.Callable; + +import static org.apache.rocketmq.connect.http.sink.constant.HttpConstant.LOG_SIFT_TAG; + + +public class HttpRequestCallable implements Callable { + + private static final Logger log = LoggerFactory.getLogger(HttpCallback.class); + + private CloseableHttpClient httpclient; + + private HttpCallback httpCallback; + + private HttpUriRequest httpUriRequest; + + private HttpClientContext context; + private SocksProxyConfig socksProxyConfig; + private String siftTag; + + private static final String SOCKS_ADDRESS_KEY = "socks.address"; + + public HttpRequestCallable(CloseableHttpClient httpclient, HttpUriRequest httpUriRequest, HttpClientContext context, + SocksProxyConfig socksProxyConfig, HttpCallback httpCallback, String siftTag) { + this.httpclient = httpclient; + this.httpUriRequest = httpUriRequest; + this.context = context; + this.socksProxyConfig = socksProxyConfig; + this.httpCallback = httpCallback; + this.siftTag = siftTag; + } + + public void loadSocks5ProxyConfig() { + if (!Strings.isNullOrEmpty(socksProxyConfig.getSocks5Endpoint())) { + String[] socksAddrAndPor = socksProxyConfig.getSocks5Endpoint() + .split(":"); + InetSocketAddress socksaddr = new InetSocketAddress(socksAddrAndPor[0], + Integer.parseInt(socksAddrAndPor[1])); + context.setAttribute(SOCKS_ADDRESS_KEY, socksaddr); + ThreadLocalProxyAuthenticator.getInstance() + .setCredentials(socksProxyConfig.getSocks5UserName(), socksProxyConfig.getSocks5Password()); + } + } + + @Override + public String call() throws Exception { + MDC.put(LOG_SIFT_TAG, siftTag); + CloseableHttpResponse response = null; + try { + Long startTime = System.currentTimeMillis(); + loadSocks5ProxyConfig(); + response = httpclient.execute(httpUriRequest, context); + if (response.getStatusLine() + .getStatusCode() / 100 != 2) { + String msg = MessageFormat.format("Http Status:{0},Msg:{1}", response.getStatusLine() + .getStatusCode(), EntityUtils.toString(response.getEntity())); + httpCallback.setMsg(msg); + httpCallback.setFailed(Boolean.TRUE); + } + log.info("The cost of one http request:{}, Connection Connection={},Keep-Alive={}", + System.currentTimeMillis() - startTime, response.getHeaders("Connection"), + response.getHeaders("Keep-Alive")); + return EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + } catch (Throwable e) { + log.error("http execute failed.", e); + httpCallback.setFailed(Boolean.TRUE); + httpCallback.setMsg(e.getLocalizedMessage()); + } finally { + httpCallback.getCountDownLatch() + .countDown(); + if (null != response.getEntity()) { + EntityUtils.consume(response.getEntity()); + } + } + return null; + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/OAuthClientImpl.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/OAuthClientImpl.java new file mode 100644 index 00000000..fe036310 --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/OAuthClientImpl.java @@ -0,0 +1,148 @@ +package org.apache.rocketmq.connect.http.sink.auth; + +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.http.sink.constant.HttpConstant; +import org.apache.rocketmq.connect.http.sink.entity.ClientConfig; +import org.apache.rocketmq.connect.http.sink.entity.HttpRequest; +import org.apache.rocketmq.connect.http.sink.entity.OAuthEntity; +import org.apache.rocketmq.connect.http.sink.entity.TokenEntity; +import org.apache.rocketmq.connect.http.sink.util.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +public class OAuthClientImpl implements Auth, Runnable { + private static final Logger log = LoggerFactory.getLogger(OAuthClientImpl.class); + public Map oauthMap; + + public void setOauthMap(Map oauthMap) { + this.oauthMap = oauthMap; + } + + @Override + public Map auth(ClientConfig config) { + Map headMap = Maps.newHashMap(); + try { + String resultToken = ""; + if (StringUtils.isNotBlank(config.getOauth2Endpoint()) + && StringUtils.isNotBlank(config.getOauth2HttpMethod())) { + OAuthEntity oAuthEntity = new OAuthEntity(); + oAuthEntity.setOauth2ClientId(config.getOauth2ClientId()); + oAuthEntity.setOauth2ClientSecret(config.getOauth2ClientSecret()); + oAuthEntity.setOauth2Endpoint(config.getOauth2Endpoint()); + oAuthEntity.setOauth2HttpMethod(config.getOauth2HttpMethod()); + oAuthEntity.setHeaderParamsters(config.getHeaderParameters()); + oAuthEntity.setQueryStringParameters(config.getQueryStringParameters()); + oAuthEntity.setTimeout(config.getTimeout()); + oAuthEntity.setHttpClient(config.getHttpClient()); + queryParameterClient(config); + final TokenEntity tokenEntity = oauthMap.get(oAuthEntity); + if (tokenEntity != null) { + headMap.put(HttpConstant.AUTHORIZATION, "Bearer " + tokenEntity.getAccessToken()); + return headMap; + } + HttpRequest httpRequest = new HttpRequest(); + resultToken = getResultToken(oAuthEntity, headMap, oAuthEntity.getHttpClient(), httpRequest); + if (StringUtils.isNotBlank(resultToken)) { + final TokenEntity token = JSONObject.parseObject(resultToken, TokenEntity.class); + if (StringUtils.isNotBlank(token.getAccessToken())) { + headMap.put(HttpConstant.AUTHORIZATION, "Bearer " + token.getAccessToken()); + token.setTokenTimestamp(Long.toString(System.currentTimeMillis())); + oauthMap.putIfAbsent(oAuthEntity, token); + } else { + throw new RuntimeException(token.getError()); + } + } + } + } catch (Exception e) { + log.error("OAuthClientImpl | auth | error => ", e); + throw new RuntimeException(e); + } + return headMap; + } + + private void queryParameterClient(ClientConfig config) { + Map map = Maps.newHashMap(); + map.put("client_id", config.getOauth2ClientId()); + map.put("client_secret", config.getOauth2ClientSecret()); + JSONObject queryString = JSONObject.parseObject(config.getQueryStringParameters()); + JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(map)); + config.setQueryStringParameters(JsonUtils.mergeJson(jsonObject, queryString).toJSONString()); + } + + public String getResultToken(OAuthEntity config, Map headMap, AbstractHttpClient httpClient, HttpRequest httpRequest) throws Exception { + if (HttpConstant.POST_METHOD.equals(config.getOauth2HttpMethod()) + || HttpConstant.PUT_METHOD.equals(config.getOauth2HttpMethod()) + || HttpConstant.PATCH_METHOD.equals(config.getOauth2HttpMethod())) { + String headerParamsters = config.getHeaderParamsters(); + JSONObject jsonObject = JSONObject.parseObject(headerParamsters); + for (Map.Entry entry : jsonObject.entrySet()) { + headMap.put(entry.getKey(), (String) entry.getValue()); + } + httpRequest.setBody(StringUtils.EMPTY); + httpRequest.setUrl(JsonUtils.queryStringAndPathValue(config.getOauth2Endpoint(), config.getQueryStringParameters(), null)); + httpRequest.setMethod(config.getOauth2HttpMethod()); + httpRequest.setTimeout(config.getTimeout()); + httpRequest.setHeaderMap(headMap); + } else { + httpRequest.setUrl(JsonUtils.queryStringAndPathValue(config.getOauth2Endpoint(), config.getQueryStringParameters(), null)); + httpRequest.setTimeout(config.getTimeout()); + httpRequest.setMethod(config.getOauth2HttpMethod()); + httpRequest.setHeaderMap(headMap); + httpRequest.setBody(StringUtils.EMPTY); + } + CountDownLatch countDownLatch = new CountDownLatch(1); + HttpCallback httpCallback = new HttpCallback(countDownLatch); + return httpClient.execute(httpRequest, httpCallback); + } + + @Override + public void run() { + log.info("OAuthTokenRunnable | run"); + oauthMap.forEach((oAuthEntity1, tokenEntity) -> { + String resultToken = ""; + long tokenTimestamp = Long.parseLong(tokenEntity.getTokenTimestamp()) + (tokenEntity.getExpiresIn() * 1000L); + log.info("OAuthTokenRunnable | run | tokenTimestamp : {} | system.currentTimeMillis : {} | boolean : {}", tokenTimestamp, System.currentTimeMillis(), System.currentTimeMillis() > tokenTimestamp); + if (System.currentTimeMillis() > tokenTimestamp) { + log.info("OAuthTokenRunnable | run | update token"); + HttpRequest httpRequest = new HttpRequest(); + try { + resultToken = this.getResultToken(oAuthEntity1, new HashMap<>(16), oAuthEntity1.getHttpClient(), httpRequest); + } catch (Exception e) { + log.error("OAuthTokenRunnable | update token | scheduledExecutorService | error => ", e); + throw new RuntimeException(e); + } + if (StringUtils.isNotBlank(resultToken)) { + final TokenEntity token = JSONObject.parseObject(resultToken, TokenEntity.class); + if (StringUtils.isNotBlank(token.getAccessToken())) { + oauthMap.putIfAbsent(oAuthEntity1, updateTokenEntity(tokenEntity, token)); + } else { + throw new RuntimeException(token.getError()); + } + } + } + }); + } + + private TokenEntity updateTokenEntity(TokenEntity oldTokenEntity, TokenEntity newTokenEntity) { + if (newTokenEntity != null) { + if (StringUtils.isNotBlank(newTokenEntity.getAccessToken())) { + oldTokenEntity.setAccessToken(newTokenEntity.getAccessToken()); + } + oldTokenEntity.setExpiresIn(newTokenEntity.getExpiresIn()); + if (StringUtils.isNotBlank(newTokenEntity.getScope())) { + oldTokenEntity.setScope(newTokenEntity.getScope()); + } + if (StringUtils.isNotBlank(newTokenEntity.getTokenType())) { + oldTokenEntity.setTokenType(newTokenEntity.getTokenType()); + } + } + oldTokenEntity.setTokenTimestamp(Long.toString(System.currentTimeMillis())); + return oldTokenEntity; + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/SocksProxyConfig.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/SocksProxyConfig.java new file mode 100644 index 00000000..ecd59dcc --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/SocksProxyConfig.java @@ -0,0 +1,38 @@ +package org.apache.rocketmq.connect.http.sink.auth; + +public class SocksProxyConfig { + + private String socks5Endpoint; + private String socks5UserName; + private String socks5Password; + + public SocksProxyConfig(String socks5Endpoint, String socks5UserName, String socks5Password) { + this.socks5Endpoint = socks5Endpoint; + this.socks5UserName = socks5UserName; + this.socks5Password = socks5Password; + } + + public String getSocks5Endpoint() { + return socks5Endpoint; + } + + public void setSocks5Endpoint(String socks5Endpoint) { + this.socks5Endpoint = socks5Endpoint; + } + + public String getSocks5UserName() { + return socks5UserName; + } + + public void setSocks5UserName(String socks5UserName) { + this.socks5UserName = socks5UserName; + } + + public String getSocks5Password() { + return socks5Password; + } + + public void setSocks5Password(String socks5Password) { + this.socks5Password = socks5Password; + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ThreadLocalProxyAuthenticator.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ThreadLocalProxyAuthenticator.java new file mode 100644 index 00000000..3faca6a8 --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/ThreadLocalProxyAuthenticator.java @@ -0,0 +1,30 @@ +package org.apache.rocketmq.connect.http.sink.auth; + +import java.net.Authenticator; +import java.net.PasswordAuthentication; + +public class ThreadLocalProxyAuthenticator extends Authenticator{ + + private ThreadLocal credential = new ThreadLocal(); + + private static class SingletonHolder { + private static final ThreadLocalProxyAuthenticator instance = new ThreadLocalProxyAuthenticator(); + } + + public static final ThreadLocalProxyAuthenticator getInstance() { + return SingletonHolder.instance; + } + + public void setCredentials(String user, String password) { + credential.set(new PasswordAuthentication(user, password.toCharArray())); + Authenticator.setDefault(this); + } + + + + @Override + public PasswordAuthentication getPasswordAuthentication() { + return credential.get(); + } + +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/common/OkHttpUtils.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/common/OkHttpUtils.java deleted file mode 100644 index 2c3fe632..00000000 --- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/common/OkHttpUtils.java +++ /dev/null @@ -1,274 +0,0 @@ -package org.apache.rocketmq.connect.http.sink.common; - -import com.alibaba.fastjson.JSON; -import okhttp3.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; -import java.io.IOException; -import java.net.URLEncoder; -import java.security.SecureRandom; -import java.security.cert.X509Certificate; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -public class OkHttpUtils { - private static final Logger log = LoggerFactory.getLogger(OkHttpUtils.class); - - private static volatile OkHttpClient okHttpClient = null; - private static volatile Semaphore semaphore = null; - private Map headerMap; - private Map paramMap; - private String url; - private Request.Builder request; - - private OkHttpUtils() { - if (okHttpClient == null) { - synchronized (OkHttpUtils.class) { - if (okHttpClient == null) { - TrustManager[] trustManagers = buildTrustManagers(); - okHttpClient = new OkHttpClient.Builder() - .connectTimeout(15, TimeUnit.SECONDS) - .writeTimeout(20, TimeUnit.SECONDS) - .readTimeout(20, TimeUnit.SECONDS) - .sslSocketFactory(createSSLSocketFactory(trustManagers), (X509TrustManager) trustManagers[0]) - .hostnameVerifier((hostName, session) -> true) - .retryOnConnectionFailure(true) - .build(); - addHeader("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); - } - } - } - } - - private static Semaphore getSemaphoreInstance() { - synchronized (OkHttpUtils.class) { - if (semaphore == null) { - semaphore = new Semaphore(0); - } - } - return semaphore; - } - - public static OkHttpUtils builder() { - return new OkHttpUtils(); - } - - public OkHttpUtils url(String url) { - this.url = url; - return this; - } - - /** - * 添加参数 - * - * @param key 参数名 - * @param value 参数值 - * @return - */ - public OkHttpUtils addParam(String key, String value) { - if (paramMap == null) { - paramMap = new LinkedHashMap<>(16); - } - paramMap.put(key, value); - return this; - } - - /** - * 添加请求头 - * - * @param key 参数名 - * @param value 参数值 - * @return - */ - public OkHttpUtils addHeader(String key, String value) { - if (headerMap == null) { - headerMap = new LinkedHashMap<>(16); - } - headerMap.put(key, value); - return this; - } - - public OkHttpUtils get() { - request = new Request.Builder().get(); - StringBuilder urlBuilder = new StringBuilder(url); - if (paramMap != null) { - urlBuilder.append("?"); - try { - for (Map.Entry entry : paramMap.entrySet()) { - urlBuilder.append(URLEncoder.encode(entry.getKey(), "utf-8")). - append("="). - append(URLEncoder.encode(entry.getValue(), "utf-8")). - append("&"); - } - } catch (Exception e) { - log.error("OkHttpUtils | get | error => ", e); - } - urlBuilder.deleteCharAt(urlBuilder.length() - 1); - } - request.url(urlBuilder.toString()); - return this; - } - - /** - * 初始化post方法 - * - * @param isJsonPost true等于json的方式提交数据,类似postman里post方法的raw - * false等于普通的表单提交 - * @return - */ - public OkHttpUtils post(boolean isJsonPost) { - RequestBody requestBody; - if (isJsonPost) { - String json = ""; - if (paramMap != null) { - json = JSON.toJSONString(paramMap); - } - requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), json); - } else { - FormBody.Builder formBody = new FormBody.Builder(); - if (paramMap != null) { - paramMap.forEach(formBody::add); - } - requestBody = formBody.build(); - } - request = new Request.Builder().post(requestBody).url(url); - return this; - } - - /** - * 同步请求 - * - * @return - */ - public String sync() { - setHeader(request); - try { - Response response = okHttpClient.newCall(request.build()).execute(); - assert response.body() != null; - return response.body().string(); - } catch (IOException e) { - log.error("OkHttpUtils | sync | error => ", e); - return "请求失败:" + e.getMessage(); - } - } - - /** - * 异步请求,有返回值 - */ - public String async() { - StringBuilder buffer = new StringBuilder(""); - setHeader(request); - okHttpClient.newCall(request.build()).enqueue(new Callback() { - @Override - public void onFailure(Call call, IOException e) { - buffer.append("请求出错:").append(e.getMessage()); - } - - @Override - public void onResponse(Call call, Response response) throws IOException { - assert response.body() != null; - buffer.append(response.body().string()); - getSemaphoreInstance().release(); - } - }); - try { - getSemaphoreInstance().acquire(); - } catch (InterruptedException e) { - log.error("OkHttpUtils | async | error => ", e); - } - return buffer.toString(); - } - - /** - * 异步请求,带有接口回调 - * - * @param callBack - */ - public void async(ICallBack callBack) { - setHeader(request); - okHttpClient.newCall(request.build()).enqueue(new Callback() { - @Override - public void onFailure(Call call, IOException e) { - callBack.onFailure(call, e.getMessage()); - } - - @Override - public void onResponse(Call call, Response response) throws IOException { - assert response.body() != null; - callBack.onSuccessful(call, response.body().string()); - } - }); - } - - /** - * 为request添加请求头 - * - * @param request - */ - private void setHeader(Request.Builder request) { - if (headerMap != null) { - try { - for (Map.Entry entry : headerMap.entrySet()) { - request.addHeader(entry.getKey(), entry.getValue()); - } - } catch (Exception e) { - log.error("OkHttpUtils | setHeader | error => ", e); - } - } - } - - - /** - * 生成安全套接字工厂,用于https请求的证书跳过 - * - * @return - */ - private static SSLSocketFactory createSSLSocketFactory(TrustManager[] trustAllCerts) { - SSLSocketFactory ssfFactory = null; - try { - SSLContext sc = SSLContext.getInstance("SSL"); - sc.init(null, trustAllCerts, new SecureRandom()); - ssfFactory = sc.getSocketFactory(); - } catch (Exception e) { - log.error("OkHttpUtils | createSSLSocketFactory | error => ", e); - } - return ssfFactory; - } - - private static TrustManager[] buildTrustManagers() { - return new TrustManager[]{ - new X509TrustManager() { - @Override - public void checkClientTrusted(X509Certificate[] chain, String authType) { - } - - @Override - public void checkServerTrusted(X509Certificate[] chain, String authType) { - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[]{}; - } - } - }; - } - - /** - * 自定义一个接口回调 - */ - public interface ICallBack { - - void onSuccessful(Call call, String data); - - void onFailure(Call call, String errorMsg); - - } -} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/AuthTypeEnum.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/AuthTypeEnum.java new file mode 100644 index 00000000..91d820ee --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/AuthTypeEnum.java @@ -0,0 +1,27 @@ +package org.apache.rocketmq.connect.http.sink.constant; + + +public enum AuthTypeEnum { + + /** + * BASIC + */ + BASIC("BASIC_AUTH"), + /** + * OAUTH_CLIENT_CREDENTIALS + */ + OAUTH_CLIENT_CREDENTIALS("OAUTH_AUTH"), + /** + * API_KEY + */ + API_KEY("API_KEY_AUTH"); + private final String authType; + + AuthTypeEnum(String authType) { + this.authType = authType; + } + + public String getAuthType() { + return authType; + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/HttpConstant.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/HttpConstant.java index 09532b04..85ff2d00 100644 --- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/HttpConstant.java +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/constant/HttpConstant.java @@ -2,7 +2,34 @@ public class HttpConstant { - public static final String URL_CONSTANT = "url"; + public static final String URL_PATTERN_CONSTANT = "urlPattern"; + public static final String METHOD_CONSTANT = "method"; + public static final String QUERY_STRING_PARAMETERS_CONSTANT = "queryStringParameters"; + public static final String HEADER_PARAMETERS_CONSTANT = "headerParameters"; + public static final String BODYS_CONSTANT = "bodys"; + public static final String AUTH_TYPE_CONSTANT = "authType"; + public static final String BASIC_USER_CONSTANT = "basicUser"; + public static final String BASIC_PASSWORD_CONSTANT = "basicPassword"; + public static final String OAUTH2_ENDPOINT_CONSTANT = "oauth2Endpoint"; + public static final String OAUTH2_CLIENTID_CONSTANT = "oauth2ClientId"; + public static final String OAUTH2_CLIENTSECRET_CONSTANT = "oauth2ClientSecret"; + public static final String OAUTH2_HTTP_METHOD_CONSTANT = "oauth2HttpMethod"; + public static final String PROXY_TYPE_CONSTANT = "proxyType"; + public static final String PROXY_HOST_CONSTANT = "proxyHost"; + public static final String PROXY_PORT_CONSTANT = "proxyPort"; + public static final String PROXY_USER_CONSTANT = "proxyUser"; + public static final String PROXY_PASSWORD_CONSTANT = "proxyPassword"; + public static final String TIMEOUT_CONSTANT = "timeout"; + public static final String HTTP_PATH_VALUE = "httpPathValue"; + public static final String HTTP_QUERY_VALUE = "httpQuery"; + public static final String HTTP_METHOD = "httpMethod"; + public static final String HTTP_HEADER = "httpHeader"; + public static final String POST_METHOD = "POST"; + public static final String PUT_METHOD = "PUT"; + public static final String PATCH_METHOD = "PATCH"; + public static final String AUTHORIZATION = "Authorization"; + public static final String API_KEY_NAME = "apiKeyName"; + public static final String API_KEY_VALUE = "apiKeyValue"; + public static final String LOG_SIFT_TAG = "SIFT-TAG"; - public static final String DATA_CONSTANT = "data"; } diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/ClientConfig.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/ClientConfig.java new file mode 100644 index 00000000..74cb5e72 --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/ClientConfig.java @@ -0,0 +1,206 @@ +package org.apache.rocketmq.connect.http.sink.entity; + + +import org.apache.rocketmq.connect.http.sink.auth.AbstractHttpClient; + +public class ClientConfig { + + private String urlPattern; + private String method; + private String queryStringParameters; + private String headerParameters; + private String bodys; + private String httpPathValue; + private String authType; + private String basicUser; + private String basicPassword; + private String apiKeyName; + private String apiKeyValue; + private String oauth2Endpoint; + private String oauth2ClientId; + private String oauth2ClientSecret; + private String oauth2HttpMethod; + private String proxyType; + private String proxyHost; + private String proxyPort; + private String proxyUser; + private String proxyPassword; + private String timeout; + private AbstractHttpClient httpClient; + + public String getProxyType() { + return proxyType; + } + + public void setProxyType(String proxyType) { + this.proxyType = proxyType; + } + + public String getProxyHost() { + return proxyHost; + } + + public void setProxyHost(String proxyHost) { + this.proxyHost = proxyHost; + } + + public String getProxyPort() { + return proxyPort; + } + + public void setProxyPort(String proxyPort) { + this.proxyPort = proxyPort; + } + + public String getProxyUser() { + return proxyUser; + } + + public void setProxyUser(String proxyUser) { + this.proxyUser = proxyUser; + } + + public String getProxyPassword() { + return proxyPassword; + } + + public void setProxyPassword(String proxyPassword) { + this.proxyPassword = proxyPassword; + } + + public String getUrlPattern() { + return urlPattern; + } + + public void setUrlPattern(String urlPattern) { + this.urlPattern = urlPattern; + } + + public String getMethod() { + return method; + } + + public void setMethod(String method) { + this.method = method; + } + + public String getQueryStringParameters() { + return queryStringParameters; + } + + public void setQueryStringParameters(String queryStringParameters) { + this.queryStringParameters = queryStringParameters; + } + + public String getHeaderParameters() { + return headerParameters; + } + + public void setHeaderParameters(String headerParameters) { + this.headerParameters = headerParameters; + } + + public String getBodys() { + return bodys; + } + + public void setBodys(String bodys) { + this.bodys = bodys; + } + + public String getAuthType() { + return authType; + } + + public void setAuthType(String authType) { + this.authType = authType; + } + + public String getBasicUser() { + return basicUser; + } + + public void setBasicUser(String basicUser) { + this.basicUser = basicUser; + } + + public String getBasicPassword() { + return basicPassword; + } + + public void setBasicPassword(String basicPassword) { + this.basicPassword = basicPassword; + } + + public String getOauth2Endpoint() { + return oauth2Endpoint; + } + + public void setOauth2Endpoint(String oauth2Endpoint) { + this.oauth2Endpoint = oauth2Endpoint; + } + + public String getOauth2ClientId() { + return oauth2ClientId; + } + + public void setOauth2ClientId(String oauth2ClientId) { + this.oauth2ClientId = oauth2ClientId; + } + + public String getOauth2ClientSecret() { + return oauth2ClientSecret; + } + + public void setOauth2ClientSecret(String oauth2ClientSecret) { + this.oauth2ClientSecret = oauth2ClientSecret; + } + + public String getOauth2HttpMethod() { + return oauth2HttpMethod; + } + + public String getHttpPathValue() { + return httpPathValue; + } + + public void setHttpPathValue(String httpPathValue) { + this.httpPathValue = httpPathValue; + } + + public void setOauth2HttpMethod(String oauth2HttpMethod) { + this.oauth2HttpMethod = oauth2HttpMethod; + } + + public String getTimeout() { + return timeout; + } + + public void setTimeout(String timeout) { + this.timeout = timeout; + } + + public String getApiKeyName() { + return apiKeyName; + } + + public void setApiKeyName(String apiKeyName) { + this.apiKeyName = apiKeyName; + } + + public String getApiKeyValue() { + return apiKeyValue; + } + + public void setApiKeyValue(String apiKeyValue) { + this.apiKeyValue = apiKeyValue; + } + + public AbstractHttpClient getHttpClient() { + return httpClient; + } + + public void setHttpClient(AbstractHttpClient httpClient) { + this.httpClient = httpClient; + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/HttpRequest.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/HttpRequest.java new file mode 100644 index 00000000..d9f5aa9e --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/HttpRequest.java @@ -0,0 +1,52 @@ +package org.apache.rocketmq.connect.http.sink.entity; + +import java.util.Map; + +public class HttpRequest { + + private String url; + private String method; + private Map headerMap; + private String body; + private String timeout; + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getMethod() { + return method; + } + + public void setMethod(String method) { + this.method = method; + } + + public Map getHeaderMap() { + return headerMap; + } + + public void setHeaderMap(Map headerMap) { + this.headerMap = headerMap; + } + + public String getBody() { + return body; + } + + public void setBody(String body) { + this.body = body; + } + + public String getTimeout() { + return timeout; + } + + public void setTimeout(String timeout) { + this.timeout = timeout; + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/OAuthEntity.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/OAuthEntity.java new file mode 100644 index 00000000..b08a9a1b --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/OAuthEntity.java @@ -0,0 +1,111 @@ +package org.apache.rocketmq.connect.http.sink.entity; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.rocketmq.connect.http.sink.auth.AbstractHttpClient; + +public class OAuthEntity { + private String oauth2Endpoint; + private String oauth2ClientId; + private String oauth2ClientSecret; + private String oauth2HttpMethod; + private String queryStringParameters; + private String headerParamsters; + private String timeout; + + private AbstractHttpClient httpClient; + + public String getTimeout() { + return timeout; + } + + public void setTimeout(String timeout) { + this.timeout = timeout; + } + + public String getOauth2Endpoint() { + return oauth2Endpoint; + } + + public void setOauth2Endpoint(String oauth2Endpoint) { + this.oauth2Endpoint = oauth2Endpoint; + } + + public String getOauth2ClientId() { + return oauth2ClientId; + } + + public void setOauth2ClientId(String oauth2ClientId) { + this.oauth2ClientId = oauth2ClientId; + } + + public String getOauth2ClientSecret() { + return oauth2ClientSecret; + } + + public void setOauth2ClientSecret(String oauth2ClientSecret) { + this.oauth2ClientSecret = oauth2ClientSecret; + } + + public String getOauth2HttpMethod() { + return oauth2HttpMethod; + } + + public void setOauth2HttpMethod(String oauth2HttpMethod) { + this.oauth2HttpMethod = oauth2HttpMethod; + } + + public String getQueryStringParameters() { + return queryStringParameters; + } + + public void setQueryStringParameters(String queryStringParameters) { + this.queryStringParameters = queryStringParameters; + } + + public String getHeaderParamsters() { + return headerParamsters; + } + + public void setHeaderParamsters(String headerParamsters) { + this.headerParamsters = headerParamsters; + } + + public AbstractHttpClient getHttpClient() { + return httpClient; + } + + public void setHttpClient(AbstractHttpClient httpClient) { + this.httpClient = httpClient; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + + if (o == null || getClass() != o.getClass()) return false; + + OAuthEntity that = (OAuthEntity) o; + + return new EqualsBuilder().append(oauth2Endpoint, that.oauth2Endpoint).append(oauth2ClientId, that.oauth2ClientId).append(oauth2ClientSecret, that.oauth2ClientSecret).append(oauth2HttpMethod, that.oauth2HttpMethod).append(queryStringParameters, that.queryStringParameters).append(headerParamsters, that.headerParamsters).isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37).append(oauth2Endpoint).append(oauth2ClientId).append(oauth2ClientSecret).append(oauth2HttpMethod).append(queryStringParameters).append(headerParamsters).toHashCode(); + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("oauth2Endpoint", oauth2Endpoint) + .append("oauth2ClientId", oauth2ClientId) + .append("oauth2ClientSecret", oauth2ClientSecret) + .append("oauth2HttpMethod", oauth2HttpMethod) + .append("queryStringParameters", queryStringParameters) + .append("headerParamsters", headerParamsters) + .append("timeout", timeout) + .toString(); + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/TokenEntity.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/TokenEntity.java new file mode 100644 index 00000000..11d59e9c --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/entity/TokenEntity.java @@ -0,0 +1,124 @@ +package org.apache.rocketmq.connect.http.sink.entity; + +import org.apache.commons.lang3.builder.ToStringBuilder; + +public class TokenEntity { + + private String accessToken; + private String tokenType; + private int expiresIn; + private String exampleParameter; + private String timestamp; + private String status; + private String error; + private String message; + private String path; + private String tokenTimestamp; + + private String scope; + + public String getAccessToken() { + return accessToken; + } + + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + + public String getTokenType() { + return tokenType; + } + + public void setTokenType(String tokenType) { + this.tokenType = tokenType; + } + + public int getExpiresIn() { + return expiresIn; + } + + public void setExpiresIn(int expiresIn) { + this.expiresIn = expiresIn; + } + + public String getTokenTimestamp() { + return tokenTimestamp; + } + + public void setTokenTimestamp(String tokenTimestamp) { + this.tokenTimestamp = tokenTimestamp; + } + + public String getExampleParameter() { + return exampleParameter; + } + + public void setExampleParameter(String exampleParameter) { + this.exampleParameter = exampleParameter; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public String getScope() { + return scope; + } + + public void setScope(String scope) { + this.scope = scope; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("accessToken", accessToken) + .append("tokenType", tokenType) + .append("expiresIn", expiresIn) + .append("exampleParameter", exampleParameter) + .append("timestamp", timestamp) + .append("status", status) + .append("error", error) + .append("message", message) + .append("path", path) + .append("tokenTimestamp", tokenTimestamp) + .append("scope", scope) + .toString(); + } +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/CheckUtils.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/CheckUtils.java new file mode 100644 index 00000000..4cbf39fc --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/CheckUtils.java @@ -0,0 +1,33 @@ +package org.apache.rocketmq.connect.http.sink.util; + +import org.apache.commons.lang3.StringUtils; + +public class CheckUtils { + + private static final String NULL_CONSTANT = "null"; + + public static Boolean checkNull(String check) { + if (StringUtils.isBlank(check)) { + return Boolean.TRUE; + } + if (StringUtils.isNotBlank(check) && NULL_CONSTANT.equalsIgnoreCase(check)) { + return Boolean.TRUE; + } + return Boolean.FALSE; + } + + public static Boolean checkNotNull(String check) { + if (StringUtils.isNotBlank(check) && !NULL_CONSTANT.equalsIgnoreCase(check)) { + return Boolean.TRUE; + } + return Boolean.FALSE; + } + + public static String checkNullReturnDefault(String check) { + if (NULL_CONSTANT.equalsIgnoreCase(check)) { + return null; + } + return check; + } + +} diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/JsonUtils.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/JsonUtils.java new file mode 100644 index 00000000..f78202cd --- /dev/null +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/util/JsonUtils.java @@ -0,0 +1,76 @@ +package org.apache.rocketmq.connect.http.sink.util; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.http.sink.constant.HttpConstant; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.Map.Entry; + +public class JsonUtils { + + private static final String questionMark = "?"; + + public static JSONObject mergeJson(JSONObject source, JSONObject target) { + if (target == null) { + return source; + } + if (source == null) { + return target; + } + for (String key : source.keySet()) { + Object value = source.get(key); + if (!target.containsKey(key)) { + target.put(key, value); + } else { + if (value instanceof JSONObject) { + JSONObject valueJson = (JSONObject) value; + JSONObject targetValue = mergeJson(valueJson, target.getJSONObject(key)); + target.put(key, targetValue); + } else if (value instanceof JSONArray) { + JSONArray valueArray = (JSONArray) value; + for (int i = 0; i < valueArray.size(); i++) { + JSONObject obj = (JSONObject) valueArray.get(i); + JSONObject targetValue = mergeJson(obj, (JSONObject) target.getJSONArray(key).get(i)); + target.getJSONArray(key).set(i, targetValue); + } + } else { + target.put(key, value); + } + } + } + return target; + } + + public static String queryStringAndPathValue(String url, String queryString, String pathValue) throws UnsupportedEncodingException { + StringBuilder pathValueString = new StringBuilder(); + if (StringUtils.isNotBlank(pathValue)) { + final JSONArray objects = JSONArray.parseArray(pathValue); + for (Object object : objects) { + pathValueString.append(HttpConstant.HTTP_PATH_VALUE) + .append("=").append(object).append("&"); + } + } + StringBuilder queryStringBuilder = new StringBuilder(); + if (StringUtils.isNotBlank(queryString)) { + final JSONObject jsonObject = JSONObject.parseObject(queryString); + for (Entry next : jsonObject.entrySet()) { + if (next.getValue() instanceof JSONObject) { + queryStringBuilder.append(next.getKey()).append("=").append(URLEncoder.encode(((JSONObject) next.getValue()).toJSONString(), "UTF-8")).append("&"); + } else { + queryStringBuilder.append(next.getKey()).append("=").append(URLEncoder.encode((String) next.getValue(), "UTF-8")).append("&"); + } + } + } + String path = pathValueString + queryStringBuilder.toString(); + if (StringUtils.isNotBlank(path) && StringUtils.isNotBlank(url)) { + if (url.contains(questionMark)) { + return url + "&" + path.substring(0, path.length() - 1); + } + return url + "?" + path.substring(0, path.length() - 1); + } + return url; + } +} diff --git a/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java index d75e4454..8f6e3ffd 100644 --- a/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java +++ b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java @@ -18,25 +18,4 @@ public class HttpSinkConnectorTest { public void testTaskConfigs() { Assert.assertEquals(httpSinkConnector.taskConfigs(1).size(), 1); } - - @Test - public void testPut() { - HttpSinkTask httpSinkTask = new HttpSinkTask(); - KeyValue keyValue = new DefaultKeyValue(); - keyValue.put(HttpConstant.URL_CONSTANT, "http://127.0.0.1:8081/demo"); - httpSinkTask.init(keyValue); - List connectRecordList = new ArrayList<>(); - ConnectRecord connectRecord = new ConnectRecord(null ,null, System.currentTimeMillis()); - connectRecord.setData("test"); - connectRecordList.add(connectRecord); - httpSinkTask.put(connectRecordList); - } - - @Test(expected = RuntimeException.class) - public void testValidate() { - KeyValue keyValue = new DefaultKeyValue(); - // 需要添加测试的http地址 - keyValue.put(HttpConstant.URL_CONSTANT, "http://127.0.0.1"); - httpSinkConnector.validate(keyValue); - } } diff --git a/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkTaskTest.java b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkTaskTest.java new file mode 100644 index 00000000..7bcf398e --- /dev/null +++ b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkTaskTest.java @@ -0,0 +1,82 @@ +package org.apache.rocketmq.connect.http.sink; + +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.connect.http.sink.constant.AuthTypeEnum; +import org.apache.rocketmq.connect.http.sink.constant.HttpConstant; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; + +public class HttpSinkTaskTest { + + private final HttpSinkTask httpSinkTask = new HttpSinkTask(); + + @Test(expected = RuntimeException.class) + public void testPutBasic() { + KeyValue keyValue = new DefaultKeyValue(); + String apiDestinationName = UUID.randomUUID().toString(); + keyValue.put(HttpConstant.URL_PATTERN_CONSTANT, "http://127.0.0.1:7001/xxxx?id=" + apiDestinationName); + keyValue.put(HttpConstant.METHOD_CONSTANT, "POST"); + keyValue.put(HttpConstant.BASIC_USER_CONSTANT, "xxxx"); + keyValue.put(HttpConstant.BASIC_PASSWORD_CONSTANT, "xxxx"); + keyValue.put(HttpConstant.AUTH_TYPE_CONSTANT, AuthTypeEnum.BASIC.getAuthType()); + httpSinkTask.setScheduledExecutorService(Executors.newSingleThreadScheduledExecutor()); + httpSinkTask.start(keyValue); + List sinkRecords = new ArrayList<>(11); + ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); + sinkRecords.add(connectRecord); + httpSinkTask.put(sinkRecords); + } + + @Test(expected = RuntimeException.class) + public void testPutApiKey() { + KeyValue keyValue = new DefaultKeyValue(); + String apiDestinationName = UUID.randomUUID().toString(); + keyValue.put(HttpConstant.URL_PATTERN_CONSTANT, "http://127.0.0.1:7001/xxxx?id=" + apiDestinationName); + keyValue.put(HttpConstant.METHOD_CONSTANT, "POST"); + keyValue.put(HttpConstant.API_KEY_NAME, "Token"); + keyValue.put(HttpConstant.API_KEY_VALUE, "xxxx"); + keyValue.put(HttpConstant.AUTH_TYPE_CONSTANT, AuthTypeEnum.API_KEY.getAuthType()); + httpSinkTask.setScheduledExecutorService(Executors.newSingleThreadScheduledExecutor()); + httpSinkTask.start(keyValue); + List sinkRecords = new ArrayList<>(11); + ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); + sinkRecords.add(connectRecord); + httpSinkTask.put(sinkRecords); + } + + @Test(expected = RuntimeException.class) + public void testPutOAuth2() { + KeyValue keyValue = new DefaultKeyValue(); + String apiDestinationName = UUID.randomUUID().toString(); + keyValue.put(HttpConstant.URL_PATTERN_CONSTANT, "http://127.0.0.1:7001/xxxx?id=" + apiDestinationName); + keyValue.put(HttpConstant.METHOD_CONSTANT, "POST"); + keyValue.put(HttpConstant.OAUTH2_CLIENTID_CONSTANT, "clientId"); + keyValue.put(HttpConstant.OAUTH2_CLIENTSECRET_CONSTANT, "clientSecret"); + keyValue.put(HttpConstant.OAUTH2_HTTP_METHOD_CONSTANT, "POST"); + keyValue.put(HttpConstant.OAUTH2_ENDPOINT_CONSTANT, "http://127.0.0.1:7001/oauth/token"); + keyValue.put(HttpConstant.AUTH_TYPE_CONSTANT, AuthTypeEnum.OAUTH_CLIENT_CREDENTIALS.getAuthType()); + Map queryStringParameters = Maps.newHashMap(); + queryStringParameters.put("grant_type", "xxxx"); + queryStringParameters.put("scope", "xxxx"); + keyValue.put(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT, new Gson().toJson(queryStringParameters)); + Map headerParameters = Maps.newHashMap(); + headerParameters.put("Content-Type", "xxxx"); + headerParameters.put("Authorization", "xxxx"); + keyValue.put(HttpConstant.HEADER_PARAMETERS_CONSTANT, new Gson().toJson(headerParameters)); + httpSinkTask.setScheduledExecutorService(Executors.newSingleThreadScheduledExecutor()); + httpSinkTask.start(keyValue); + List sinkRecords = new ArrayList<>(11); + ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); + sinkRecords.add(connectRecord); + httpSinkTask.put(sinkRecords); + } +} From bab09e9bbb7e80040d20f98717d5cdc48dd6935c Mon Sep 17 00:00:00 2001 From: zh378814 Date: Thu, 1 Dec 2022 10:50:04 +0800 Subject: [PATCH 2/3] update validate --- .../org/apache/rocketmq/connect/http/sink/HttpSinkTask.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java index 70841509..fc5a6b0d 100644 --- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java @@ -150,8 +150,7 @@ private void addHeaderMap(Map headerMap, ClientConfig clientConf @Override public void validate(KeyValue config) { if (CheckUtils.checkNull(config.getString(HttpConstant.URL_PATTERN_CONSTANT)) - || CheckUtils.checkNull(config.getString(HttpConstant.AUTH_TYPE_CONSTANT)) - || CheckUtils.checkNull(config.getString(HttpConstant.BODYS_CONSTANT))) { + || CheckUtils.checkNull(config.getString(HttpConstant.METHOD_CONSTANT))) { throw new RuntimeException("http required parameter is null !"); } final List collect = Arrays.stream(AuthTypeEnum.values()).filter(authTypeEnum -> authTypeEnum.getAuthType().equals(config.getString(HttpConstant.AUTH_TYPE_CONSTANT))).collect(Collectors.toList()); From ba353bad1cdc70786a43ece5c956badd994b7d5b Mon Sep 17 00:00:00 2001 From: zh378814 Date: Fri, 2 Dec 2022 10:38:40 +0800 Subject: [PATCH 3/3] update result --- .../rocketmq/connect/http/sink/auth/HttpRequestCallable.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpRequestCallable.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpRequestCallable.java index 2b19fc84..87ec8f40 100644 --- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpRequestCallable.java +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/auth/HttpRequestCallable.java @@ -64,17 +64,18 @@ public String call() throws Exception { Long startTime = System.currentTimeMillis(); loadSocks5ProxyConfig(); response = httpclient.execute(httpUriRequest, context); + String result = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); if (response.getStatusLine() .getStatusCode() / 100 != 2) { String msg = MessageFormat.format("Http Status:{0},Msg:{1}", response.getStatusLine() - .getStatusCode(), EntityUtils.toString(response.getEntity())); + .getStatusCode(), result); httpCallback.setMsg(msg); httpCallback.setFailed(Boolean.TRUE); } log.info("The cost of one http request:{}, Connection Connection={},Keep-Alive={}", System.currentTimeMillis() - startTime, response.getHeaders("Connection"), response.getHeaders("Keep-Alive")); - return EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + return result; } catch (Throwable e) { log.error("http execute failed.", e); httpCallback.setFailed(Boolean.TRUE);