Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update http #384

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions connectors/rocketmq-connect-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@ mvn clean install -Dmaven.test.skip=true

```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-sink-connector-name}
?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "${connect-topicname}","url":"${url}"}
?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"HttpSinkConnector",
"urlPattern":"${urlPattern}","method":"${method}","queryStringParameters":"${queryStringParameters}","headerParameters":"${headerParameters}","bodys":"${bodys}","authType":"${authType}","basicUser":"${basicUser}","basicPassword":"${basicPassword}",
"oauth2Endpoint":"${oauth2Endpoint}","oauth2ClientId":"${oauth2ClientId}","oauth2ClientSecret":"${oauth2ClientSecret}","oauth2HttpMethod":"${oauth2HttpMethod}","proxyType":"${proxyType}","proxyHost":"${proxyHost}","proxyPort":"${proxyPort}","proxyUser":"${proxyUser}",
"proxyPort":"${proxyPort}","proxyPort":"${proxyPort}","proxyUser":"${proxyUser}","proxyPassword":"${proxyPassword}","apiKeyName":"${apiKeyName}","apiKeyValue":"${apiKeyValue}","timeout":"${timeout}"}
```

例子
例子
```
http://localhost:8081/connectors/httpConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "http-topic","url":"192.168.1.2"}
"connector-class":"HttpSinkConnector","urlPattern":"http://127.0.0.1","method":"POST","queryStringParameters":"","headerParameters":"","bodys":"{"id" : "234"}","authType":"BASIC_AUTH","basicUser":"","basicPassword":"",
"oauth2Endpoint":"","oauth2ClientId":"","oauth2ClientSecret":"","oauth2HttpMethod":"","proxyType":"","proxyHost":"","proxyPort":"","proxyUser":"",
"proxyPort":"","proxyPort":"","proxyUser":"","proxyPassword":"","apiKeyName":"","apiKeyValue":"","timeout":"6000"}
```

>**注:** `rocketmq-http-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
Expand All @@ -35,8 +40,25 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-connector-name}/
## rocketmq-connect-http 参数说明
* **http-sink-connector 参数说明**

| KEY | TYPE | Must be filled | Description | Example
|-----|---------|----------------|-------------|------------------|
| url | String | YES | sink端 域名地址 | http://127.0.0.1 |
|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx |

| KEY | TYPE | Must be filled | Description | Example
|-----------------------|---------|----------------|----------------|------------------|
| urlPattern | String | YES | sink端 域名地址 | http://127.0.0.1 |
| method | String | YES | 请求类型 | POST、GET |
| queryStringParameters | String | NO | 请求参数 | xxxx |
| headerParameters | String | NO | 请求头 | xxxx |
| bodys | String | NO | 请求体 | xxxx |
| authType | String | NO | 权限类型 | BASIC_AUTH、OAUTH_AUTH、API_KEY_AUTH |
| basicUser | String | NO | 用户名 | xxxx |
| basicPassword | String | NO | 密码 | xxxx |
| oauth2Endpoint | String | NO | OAuth获取token地址 | http://127.0.0.1 |
| oauth2ClientId | String | NO | clientId | xxxx |
| oauth2ClientSecret | String | NO | client secret | xxxx |
| oauth2HttpMethod | String | NO | oauth的请求类型 | xxxx |
| proxyType | String | NO | 代理类型 | xxxx |
| proxyHost | String | NO | 代理地址 | xxxx |
| proxyPort | String | NO | 代理端口 | xxxx |
| proxyUser | String | NO | 代理的访问的用户名 | xxxx |
| proxyPassword | String | NO | 代理访问的密码 | xxxx |
| apiKeyName | String | NO | auth api key | xxxx |
| apiKeyValue | String | NO | auth api value | xxxx |
| timeout | String | NO | 超时时间 | xxxx |
49 changes: 37 additions & 12 deletions connectors/rocketmq-connect-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@
<maven.compiler.target>8</maven.compiler.target>
<slf4j.version>1.7.7</slf4j.version>
<logback.version>1.2.9</logback.version>
<gson.version>2.9.0</gson.version>
<junit.version>4.13.1</junit.version>
<assertj.version>2.6.0</assertj.version>
<mockito.version>2.6.3</mockito.version>
<openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version>
<gson.version>2.10</gson.version>
<junit.version>4.13.2</junit.version>
<assertj.version>3.23.1</assertj.version>
<mockito.version>4.8.0</mockito.version>
<openmessaging-connector.version>0.1.5-SNAPSHOT</openmessaging-connector.version>
<okhttp.version>3.9.1</okhttp.version>
<fastjson.version>1.2.83</fastjson.version>
<fastjson.version>2.0.19</fastjson.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<guava.version>31.1-jre</guava.version>
<httpclient.version>4.4.1</httpclient.version>
<netty-all.version>4.1.85.Final</netty-all.version>
</properties>

<build>
Expand Down Expand Up @@ -183,19 +186,41 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty-all.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,63 @@
package org.apache.rocketmq.connect.http.sink;

import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import io.openmessaging.internal.DefaultKeyValue;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
import org.apache.rocketmq.connect.http.sink.util.CheckUtils;

import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.List;

public class HttpSinkConnector extends SinkConnector {

private String url;

@Override
public void pause() {

}
protected String urlPattern;
protected String method;
protected String queryStringParameters;
protected String headerParameters;
protected String bodys;
protected String authType;
protected String basicUser;
protected String basicPassword;
protected String oauth2Endpoint;
protected String oauth2ClientId;
protected String oauth2ClientSecret;
protected String oauth2HttpMethod;
protected String proxyType;
protected String proxyHost;
protected String proxyPort;
protected String proxyUser;
protected String proxyPassword;
protected String timeout;
protected String apiKeyName;
protected String apiKeyValue;

@Override
public void resume() {

}

@Override
public List<KeyValue> taskConfigs(int maxTasks) {
List<KeyValue> keyValueList = new ArrayList<>(11);
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(HttpConstant.URL_CONSTANT, url);
keyValue.put(HttpConstant.URL_PATTERN_CONSTANT, urlPattern);
keyValue.put(HttpConstant.METHOD_CONSTANT, method);
keyValue.put(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT, queryStringParameters);
keyValue.put(HttpConstant.HEADER_PARAMETERS_CONSTANT, headerParameters);
keyValue.put(HttpConstant.BODYS_CONSTANT, bodys);
keyValue.put(HttpConstant.AUTH_TYPE_CONSTANT, authType);
keyValue.put(HttpConstant.BASIC_USER_CONSTANT, basicUser);
keyValue.put(HttpConstant.BASIC_PASSWORD_CONSTANT, basicPassword);
keyValue.put(HttpConstant.OAUTH2_ENDPOINT_CONSTANT, oauth2Endpoint);
keyValue.put(HttpConstant.OAUTH2_CLIENTID_CONSTANT, oauth2ClientId);
keyValue.put(HttpConstant.OAUTH2_CLIENTSECRET_CONSTANT, oauth2ClientSecret);
keyValue.put(HttpConstant.OAUTH2_HTTP_METHOD_CONSTANT, oauth2HttpMethod);
keyValue.put(HttpConstant.PROXY_TYPE_CONSTANT, proxyType);
keyValue.put(HttpConstant.PROXY_HOST_CONSTANT, proxyHost);
keyValue.put(HttpConstant.PROXY_PORT_CONSTANT, proxyPort);
keyValue.put(HttpConstant.PROXY_USER_CONSTANT, proxyUser);
keyValue.put(HttpConstant.PROXY_PASSWORD_CONSTANT, proxyPassword);
keyValue.put(HttpConstant.TIMEOUT_CONSTANT, timeout);
keyValue.put(HttpConstant.API_KEY_NAME, apiKeyName);
keyValue.put(HttpConstant.API_KEY_VALUE, apiKeyValue);
keyValueList.add(keyValue);
return keyValueList;
}
Expand All @@ -42,24 +69,35 @@ public Class<? extends Task> taskClass() {

@Override
public void validate(KeyValue config) {
if (StringUtils.isBlank(config.getString(HttpConstant.URL_CONSTANT))) {
throw new RuntimeException("http required parameter is null !");
}
try {
URL urlConnect = new URL(config.getString(HttpConstant.URL_CONSTANT));
URLConnection urlConnection = urlConnect.openConnection();
urlConnection.setConnectTimeout(5000);
urlConnection.connect();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}

@Override
public void init(KeyValue config) {
url = config.getString(HttpConstant.URL_CONSTANT);
public void start(KeyValue config) {
urlPattern = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.URL_PATTERN_CONSTANT));
method = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.METHOD_CONSTANT));
queryStringParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT));
headerParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.HEADER_PARAMETERS_CONSTANT));
bodys = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BODYS_CONSTANT));
authType = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.AUTH_TYPE_CONSTANT));
basicUser = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_USER_CONSTANT));
basicPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_PASSWORD_CONSTANT));
oauth2Endpoint = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_ENDPOINT_CONSTANT));
oauth2ClientId = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_CLIENTID_CONSTANT));
oauth2ClientSecret = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_CLIENTSECRET_CONSTANT));
oauth2HttpMethod = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH2_HTTP_METHOD_CONSTANT));
proxyType = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_TYPE_CONSTANT));
proxyHost = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_HOST_CONSTANT));
proxyPort = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_PORT_CONSTANT));
proxyUser = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_USER_CONSTANT));
proxyPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.PROXY_PASSWORD_CONSTANT));
timeout = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.TIMEOUT_CONSTANT));
apiKeyName = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_NAME));
apiKeyValue = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_VALUE));
queryStringParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.QUERY_STRING_PARAMETERS_CONSTANT));
headerParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.HEADER_PARAMETERS_CONSTANT));
}


@Override
public void stop() {

Expand Down
Loading