Skip to content

Commit

Permalink
enrich rocketmq-connect-http (#529)
Browse files Browse the repository at this point in the history
  • Loading branch information
ingdex authored Mar 22, 2024
1 parent 8dcf2b3 commit 02a168c
Show file tree
Hide file tree
Showing 38 changed files with 3,427 additions and 438 deletions.
73 changes: 59 additions & 14 deletions connectors/rocketmq-connect-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,48 @@
Be responsible for consuming messages from producer and writing data to another web service system.
```

## rocketmq-connect-http 打包
```
mvn clean install -Dmaven.test.skip=true
```
## rocketmq-connect-http使用方法

1. 进入想要使用的connectors目录下(以rocketmq-connect-http目录为例),使用以下指令将插件进行打包
```shell
mvn clean package -Dmaven.test.skip=true
```
2. 打包好的插件以jar包的模式出现在`rocketmq-connect-http/target/`目录下

3.`distribution/conf`目录下找的对应的配置文件进行更新,对于standalone的启动方式,更新`connect-standalone.conf`文件中的`pluginPaths`变量

```
pluginPaths=(you plugin path)
```

相应的,使用distributed启动方式,则更新`connect-distributed.conf`中的变量
4. 创建并启动对应的`SourceConnector`以及`SinkConnector`


## rocketmq-connect-http 启动

* **http-sink-connector** 启动

```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-sink-connector-name}
?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "${connect-topicname}","url":"${url}"}
POST http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-source-connector-name}
{
"connector.class":"org.apache.rocketmq.connect.http.HttpSinkTask",
"url":"${url}",
"method":"${method}",
"connect.topicnames":"${connect.topicnames}"
}
```

例子
例子
```
http://localhost:8081/connectors/httpSinkConnector?config={"connector-class":"org.apache.rocketmq.connect.http.HttpSinkTask","connect-topicname" : "http-topic","url":"192.168.1.2"}
```
http://localhost:8081/connectors/httpConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "http-topic","url":"192.168.1.2"}
```在请求中定义http header、query、body参数
http://127.0.0.1:8082/connectors/httpSinkConnector?config={"connector.class":"org.apache.rocketmq.connect.http.HttpSinkConnector","url":"http://localhost:8080/api","timeout":"6000","connect.topicnames":"fileTopic","headerParameters":"{\"header1k\":\"header1v\"}","method":"POST","queryParameters":"{\"queryk1\":\"queryv1\"}"}
```

更多参数见[rocketmq-connect-http 参数说明](#rocketmq-connect-http-参数说明)

>**注:** `rocketmq-http-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
## rocketmq-connect-http 停止
Expand All @@ -35,8 +57,31 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-connector-name}/
## rocketmq-connect-http 参数说明
* **http-sink-connector 参数说明**

| KEY | TYPE | Must be filled | Description | Example
|-----|---------|----------------|-------------|------------------|
| url | String | YES | sink端 域名地址 | http://127.0.0.1 |
|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx |

| KEY | TYPE | Must be filled | Description | Example |
|-----------------------|--------|----------------|------------------------------------------------|---------------------------|
| connect-topicname | String | YES | sink需要处理数据消息topic | fileTopic |
| url | String | YES | 目标端url地址 | http://localhost:8080/api |
| method | String | YES | http请求方法 | POST |
| body | String | No | http请求body字段,不填时默认使用事件的Data字段 | POST |
| headerParameters | String | NO | http请求header map动态参数Json字符串 | {"key1":"value1"} |
| fixedHeaderParameters | String | NO | http请求header map静态参数Json字符串 | {"key1":"value1"} |
| queryParameters | String | NO | http请求query map动态参数Json字符串 | {"key1":"value1"} |
| fixedQueryParameters | String | NO | http请求query map静态参数Json字符串 | {"key1":"value1"} |
| socks5UserName | String | NO | sock5代理用户名 | ***** |
| socks5Password | String | NO | sock5代理密码 | ***** |
| socks5Endpoint | String | NO | sock5代理地址 | http://localhost:7000 |
| timeout | String | NO | http请求超时时间(毫秒) | 3000 |
| concurrency | String | NO | http请求并发数 | 1 |
| authType | String | NO | 认证方式 (BASIC_AUTH/OAUTH_AUTH/API_KEY_AUTH/NONE) | BASIC_AUTH |
| basicUsername | String | NO | basic auth username | ***** |
| basicPassword | String | NO | basic auth password | ***** |
| apiKeyUsername | String | NO | api key auth username | ***** |
| apiKeyPassword | String | NO | api key auth password | ***** |
| oAuthEndpoint | String | NO | oauth 地址 | http://localhost:7000 |
| oAuthHttpMethod | String | NO | oauth http请求方法 | GET |
| oAuthClientId | String | NO | oauth client id | xxxx |
| oAuthClientSecret | String | NO | oauth client secret | xxxx |
| oAuthHeaderParameters | String | NO | oauth header map参数Json字符串 | {"key1":"value1"} |
| oAuthQueryParameters | String | NO | oauth query map参数Json字符串 | {"key1":"value1"} |
| oAuthBody | String | NO | oauth body参数 | bodyData |
| token | String | NO | http请求token,如果非空,会添加到http请求的header中,key为token | xxxx |
78 changes: 78 additions & 0 deletions connectors/rocketmq-connect-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,49 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<id>verify</id>
<phase>verify</phase>
<configuration>
<configLocation>../../style/rmq_checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
<includeTestResources>false</includeTestResources>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<id>verify</id>
<phase>verify</phase>
<configuration>
<configLocation>../../style/rmq_checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
<includeTestResources>false</includeTestResources>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
Expand Down Expand Up @@ -197,6 +240,41 @@
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.4.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>compile</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.http;

import org.apache.rocketmq.connect.http.constant.HttpConstant;

import java.util.HashSet;
import java.util.Set;

public class HttpConfig {
public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
{
add(HttpConstant.URL);
add(HttpConstant.METHOD);
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.http;

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import io.openmessaging.connector.api.errors.ConnectException;

import java.util.ArrayList;
import java.util.List;

/**
* http sink connector
*/
public class HttpSinkConnector extends SinkConnector {

@Override
public Class<? extends Task> taskClass() {
return HttpSinkTask.class;
}

private KeyValue config;

@Override public void validate(KeyValue config) {
for (String requestKey : HttpConfig.REQUEST_CONFIG) {
if (!config.containsKey(requestKey)) {
throw new ConnectException("Request config key: " + requestKey);
}
}
}

@Override public void start(KeyValue config) {
this.config = config;
}

@Override public void stop() {
this.config = null;
}


@Override public List<KeyValue> taskConfigs(int maxTasks) {
List<KeyValue> config = new ArrayList<>();
config.add(this.config);
return config;
}
}
Loading

0 comments on commit 02a168c

Please sign in to comment.