diff --git a/connectors/rocketmq-connect-http/README.md b/connectors/rocketmq-connect-http/README.md
index 8b6d5c704..7cb845dd4 100644
--- a/connectors/rocketmq-connect-http/README.md
+++ b/connectors/rocketmq-connect-http/README.md
@@ -4,26 +4,48 @@
Be responsible for consuming messages from producer and writing data to another web service system.
```
-## rocketmq-connect-http 打包
-```
-mvn clean install -Dmaven.test.skip=true
-```
+## rocketmq-connect-http使用方法
+
+1. 进入想要使用的connectors目录下(以rocketmq-connect-http目录为例),使用以下指令将插件进行打包
+ ```shell
+ mvn clean package -Dmaven.test.skip=true
+ ```
+2. 打包好的插件以jar包的模式出现在`rocketmq-connect-http/target/`目录下
+
+3. 在`distribution/conf`目录下找的对应的配置文件进行更新,对于standalone的启动方式,更新`connect-standalone.conf`文件中的`pluginPaths`变量
+
+ ```
+ pluginPaths=(you plugin path)
+ ```
+
+ 相应的,使用distributed启动方式,则更新`connect-distributed.conf`中的变量
+4. 创建并启动对应的`SourceConnector`以及`SinkConnector`
+
## rocketmq-connect-http 启动
* **http-sink-connector** 启动
```
-http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-sink-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "${connect-topicname}","url":"${url}"}
+POST http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-source-connector-name}
+{
+ "connector.class":"org.apache.rocketmq.connect.http.HttpSinkTask",
+ "url":"${url}",
+ "method":"${method}",
+ "connect.topicnames":"${connect.topicnames}"
+}
```
-例子
+例子
+```
+http://localhost:8081/connectors/httpSinkConnector?config={"connector-class":"org.apache.rocketmq.connect.http.HttpSinkTask","connect-topicname" : "http-topic","url":"192.168.1.2"}
```
-http://localhost:8081/connectors/httpConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "http-topic","url":"192.168.1.2"}
+```在请求中定义http header、query、body参数
+http://127.0.0.1:8082/connectors/httpSinkConnector?config={"connector.class":"org.apache.rocketmq.connect.http.HttpSinkConnector","url":"http://localhost:8080/api","timeout":"6000","connect.topicnames":"fileTopic","headerParameters":"{\"header1k\":\"header1v\"}","method":"POST","queryParameters":"{\"queryk1\":\"queryv1\"}"}
```
+更多参数见[rocketmq-connect-http 参数说明](#rocketmq-connect-http-参数说明)
+
>**注:** `rocketmq-http-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
## rocketmq-connect-http 停止
@@ -35,8 +57,31 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-connector-name}/
## rocketmq-connect-http 参数说明
* **http-sink-connector 参数说明**
-| KEY | TYPE | Must be filled | Description | Example
-|-----|---------|----------------|-------------|------------------|
-| url | String | YES | sink端 域名地址 | http://127.0.0.1 |
-|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx |
-
+| KEY | TYPE | Must be filled | Description | Example |
+|-----------------------|--------|----------------|------------------------------------------------|---------------------------|
+| connect-topicname | String | YES | sink需要处理数据消息topic | fileTopic |
+| url | String | YES | 目标端url地址 | http://localhost:8080/api |
+| method | String | YES | http请求方法 | POST |
+| body | String | No | http请求body字段,不填时默认使用事件的Data字段 | POST |
+| headerParameters | String | NO | http请求header map动态参数Json字符串 | {"key1":"value1"} |
+| fixedHeaderParameters | String | NO | http请求header map静态参数Json字符串 | {"key1":"value1"} |
+| queryParameters | String | NO | http请求query map动态参数Json字符串 | {"key1":"value1"} |
+| fixedQueryParameters | String | NO | http请求query map静态参数Json字符串 | {"key1":"value1"} |
+| socks5UserName | String | NO | sock5代理用户名 | ***** |
+| socks5Password | String | NO | sock5代理密码 | ***** |
+| socks5Endpoint | String | NO | sock5代理地址 | http://localhost:7000 |
+| timeout | String | NO | http请求超时时间(毫秒) | 3000 |
+| concurrency | String | NO | http请求并发数 | 1 |
+| authType | String | NO | 认证方式 (BASIC_AUTH/OAUTH_AUTH/API_KEY_AUTH/NONE) | BASIC_AUTH |
+| basicUsername | String | NO | basic auth username | ***** |
+| basicPassword | String | NO | basic auth password | ***** |
+| apiKeyUsername | String | NO | api key auth username | ***** |
+| apiKeyPassword | String | NO | api key auth password | ***** |
+| oAuthEndpoint | String | NO | oauth 地址 | http://localhost:7000 |
+| oAuthHttpMethod | String | NO | oauth http请求方法 | GET |
+| oAuthClientId | String | NO | oauth client id | xxxx |
+| oAuthClientSecret | String | NO | oauth client secret | xxxx |
+| oAuthHeaderParameters | String | NO | oauth header map参数Json字符串 | {"key1":"value1"} |
+| oAuthQueryParameters | String | NO | oauth query map参数Json字符串 | {"key1":"value1"} |
+| oAuthBody | String | NO | oauth body参数 | bodyData |
+| token | String | NO | http请求token,如果非空,会添加到http请求的header中,key为token | xxxx |
diff --git a/connectors/rocketmq-connect-http/pom.xml b/connectors/rocketmq-connect-http/pom.xml
index ba8a8b85d..6fcd1d366 100644
--- a/connectors/rocketmq-connect-http/pom.xml
+++ b/connectors/rocketmq-connect-http/pom.xml
@@ -141,6 +141,49 @@
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 3.1.2
+
+
+ verify
+ verify
+
+ ../../style/rmq_checkstyle.xml
+ UTF-8
+ true
+ true
+ false
+ false
+
+
+ check
+
+
+
+
+
+ maven-checkstyle-plugin
+ 3.1.2
+
+
+ verify
+ verify
+
+ ../../style/rmq_checkstyle.xml
+ UTF-8
+ true
+ true
+ false
+ false
+
+
+ check
+
+
+
+
@@ -197,6 +240,41 @@
commons-lang3
${commons-lang3.version}
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.13
+ compile
+
+
+ com.google.code.gson
+ gson
+ 2.8.9
+ compile
+
+
+ io.netty
+ netty-all
+ 4.1.25.Final
+ compile
+
+
+ com.google.guava
+ guava
+ 31.1-jre
+ compile
+
+
+ org.apache.httpcomponents
+ httpmime
+ 4.4.1
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+ compile
+
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpConfig.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpConfig.java
new file mode 100644
index 000000000..39e14b911
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.http;
+
+import org.apache.rocketmq.connect.http.constant.HttpConstant;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class HttpConfig {
+ public static final Set REQUEST_CONFIG = new HashSet() {
+ {
+ add(HttpConstant.URL);
+ add(HttpConstant.METHOD);
+ }
+ };
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpSinkConnector.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpSinkConnector.java
new file mode 100644
index 000000000..8db9a99e9
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpSinkConnector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.http;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.errors.ConnectException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * http sink connector
+ */
+public class HttpSinkConnector extends SinkConnector {
+
+ @Override
+ public Class extends Task> taskClass() {
+ return HttpSinkTask.class;
+ }
+
+ private KeyValue config;
+
+ @Override public void validate(KeyValue config) {
+ for (String requestKey : HttpConfig.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
+ throw new ConnectException("Request config key: " + requestKey);
+ }
+ }
+ }
+
+ @Override public void start(KeyValue config) {
+ this.config = config;
+ }
+
+ @Override public void stop() {
+ this.config = null;
+ }
+
+
+ @Override public List taskConfigs(int maxTasks) {
+ List config = new ArrayList<>();
+ config.add(this.config);
+ return config;
+ }
+}
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpSinkTask.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpSinkTask.java
new file mode 100644
index 000000000..d8a220c97
--- /dev/null
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpSinkTask.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.http;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.errors.RetriableException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.http.auth.Auth;
+import org.apache.rocketmq.connect.http.auth.BasicAuthImpl;
+import org.apache.rocketmq.connect.http.auth.ApiKeyAuthImpl;
+import org.apache.rocketmq.connect.http.auth.OAuthClientImpl;
+import org.apache.rocketmq.connect.http.constant.AuthTypeEnum;
+import org.apache.rocketmq.connect.http.constant.HttpConstant;
+import org.apache.rocketmq.connect.http.auth.ApacheHttpClientImpl;
+import org.apache.rocketmq.connect.http.auth.AbstractHttpClient;
+import org.apache.rocketmq.connect.http.auth.HttpCallback;
+import org.apache.rocketmq.connect.http.entity.HttpBasicAuthParameters;
+import org.apache.rocketmq.connect.http.entity.HttpApiKeyAuthParameters;
+import org.apache.rocketmq.connect.http.entity.HttpAuthParameters;
+import org.apache.rocketmq.connect.http.entity.HttpOAuthParameters;
+import org.apache.rocketmq.connect.http.entity.HttpRequest;
+import org.apache.rocketmq.connect.http.entity.ProxyConfig;
+import org.apache.rocketmq.connect.http.util.CheckUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HttpSinkTask extends SinkTask {
+ private static final Logger log = LoggerFactory.getLogger(HttpSinkTask.class);
+ protected static final int DEFAULT_CONSUMER_TIMEOUT_SECONDS = 30;
+ protected static final String DEFAULT_REQUEST_TIMEOUT_MILL_SECONDS = "3000";
+ protected static final int DEFAULT_OAUTH_DELAY_SECONDS = 1;
+ protected static final int DEFAULT_CONCURRENCY = 1;
+
+ private String url;
+ private String method;
+ private String headerParameters;
+ private String fixedHeaderParameters;
+ private String queryParameters;
+ private String fixedQueryParameters;
+ private String body;
+ private String socks5UserName;
+ private String socks5Password;
+ private String socks5Endpoint;
+ private String timeout;
+ private String concurrency;
+ private String authType;
+ private String basicUsername;
+ private String basicPassword;
+ private String apiKeyUsername;
+ private String apiKeyPassword;
+ private String oAuthEndpoint;
+ private String oAuthHttpMethod;
+ private String oAuthClientId;
+ private String oAuthClientSecret;
+ private String oAuthHeaderParameters;
+ private String oAuthQueryParameters;
+ private String oAuthBody;
+ private String token;
+
+ private Long awaitTimeoutSeconds;
+ private Auth auth;
+ private HttpAuthParameters httpAuthParameters;
+ private ScheduledExecutorService scheduledExecutorService;
+ private AbstractHttpClient httpClient;
+
+ @Override
+ public void put(List records) throws ConnectException {
+ try {
+ Long startTime = System.currentTimeMillis();
+ CountDownLatch countDownLatch = new CountDownLatch(records.size());
+ HttpCallback httpCallback = new HttpCallback(countDownLatch);
+ for (ConnectRecord connectRecord : records) {
+ // body
+ // header
+ Map headerMap = renderHeaderMap(headerParameters, fixedHeaderParameters, token);
+ if (auth != null) {
+ headerMap.putAll(auth.auth());
+ }
+ // render query to url
+ String urlWithQueryParameters = renderQueryParametersToUrl(url, queryParameters, fixedQueryParameters);
+ HttpRequest httpRequest = new HttpRequest();
+ httpRequest.setUrl(urlWithQueryParameters);
+ httpRequest.setMethod(method);
+ httpRequest.setHeaderMap(headerMap);
+ if (body != null) {
+ httpRequest.setBody(body);
+ } else {
+ httpRequest.setBody(new Gson().toJson(connectRecord.getData()));
+ }
+ httpRequest.setTimeout(StringUtils.isNotBlank(timeout) ? timeout : DEFAULT_REQUEST_TIMEOUT_MILL_SECONDS);
+ log.info("HttpSinkTask send request | url:{} | method: {} | headerMap: {} | body: {}",
+ httpRequest.getUrl(), httpRequest.getMethod(), httpRequest.getHeaderMap(), httpRequest.getBody());
+ httpClient.executeNotReturn(httpRequest, httpCallback);
+ }
+ boolean consumeSucceed = Boolean.FALSE;
+ try {
+ consumeSucceed = countDownLatch.await(awaitTimeoutSeconds, TimeUnit.SECONDS);
+ } catch (Throwable e) {
+ log.error("count down latch failed.", e);
+ }
+ if (!consumeSucceed) {
+ throw new RetriableException("Request Timeout");
+ }
+ if (httpCallback.isFailed()) {
+ throw new RetriableException(httpCallback.getMsg());
+ }
+ log.info("HttpSinkTask put size:{},cost:{}", records.size(), System.currentTimeMillis() - startTime);
+ } catch (Exception e) {
+ log.error("HttpSinkTask | put | error => ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Map renderHeaderMap(String headerParameters, String fixedHeaderParameters, String token) {
+ Map headerMap = new HashMap<>();
+ if (headerParameters != null) {
+ Map userDefinedHeaders = new Gson().fromJson(headerParameters, new TypeToken