Skip to content

Commit

Permalink
[ISSUES #509] Replicator remove runtime dependency (#510)
Browse files Browse the repository at this point in the history
* replicator remove runtime dependency

* change max.task to max.tasks

* task config add connector.class

* fix test case error
  • Loading branch information
odbozhou authored May 17, 2023
1 parent 367790b commit 85d9c03
Show file tree
Hide file tree
Showing 19 changed files with 85 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ PRIMARY KEY (`id`)
```
curl-X POST-H"Content-Type: application/json"http: //127.0.0.1:8082/connectors/MySQLCDCSource'{
"connector.class": "org.apache.rocketmq.connect.debezium.MySQL.DebeziumMySQLConnector",
"max.task": "1",
"max.tasks": "1",
"connect.topicname": "debezium-MySQL-source-topic",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
Expand Down Expand Up @@ -209,7 +209,7 @@ curl-X POST-H"Content-Type: application/json"http: //127.0.0.1:8082/connectors/M
```
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest -d '{
"connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
"max.task": "2",
"max.tasks": "2",
"connect.topicnames": "debezium-mysql-source",
"connection.url": "jdbc:mysql://数据库ip:3306/inventory_2",
"connection.user": "root",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ CREATE TABLE holding (
```
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/postgres-connector -d '{
"connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
"max.task": "1",
"max.tasks": "1",
"connect.topicname": "debezium-postgres-source-01",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
Expand All @@ -202,7 +202,7 @@ curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connector
```
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{
"connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
"max.task": "2",
"max.tasks": "2",
"connect.topicnames": "debezium-postgres-source-01",
"connection.url": "jdbc:mysql://数据库ip:3306/bank1",
"connection.user": "root",
Expand Down
4 changes: 2 additions & 2 deletions connectors/rocketmq-connect-doris/README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Transaction concept not different in Mysql and Doris, so TCL is not supported.
POST http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-sink-connector-name}
{
"connector.class":"org.apache.rocketmq.connect.doris.connector.DorisSinkConnector",
"max.task":"1",
"max.tasks":"1",
"table.whitelist":"sink_test.doris_test_sink",
"connect.topicnames":"doris_test_sink",
"host":"xx.xx.xx.xx",
Expand Down Expand Up @@ -75,7 +75,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-doris-connector-name}
|port | String | YES |doris http port | 8030 |
|user | String | YES |doris user name | root |
|passwd | String | YES |doris passwd | passwd |
|max.task | Integer | NO |task number | 2 |
|max.tasks | Integer | NO |task number | 2 |
|key.converter | String | YES |data converter | org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter |
|value.converter | String | YES |value converter | org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter |
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ rocketmq connect runtime参数:
如果是SinkConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector。

- **connector.class**: 要导入导出数据的rocketmq topic
- **max.task**: 启动的task数目
- **max.tasks**: 启动的task数目

kafka-connector参数放在kafka.connector.configs里,又分为2类:kafka-connector-adapter参数,以及 具体kafka connector

Expand Down
4 changes: 2 additions & 2 deletions connectors/rocketmq-replicator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ curl -X POST -H "Content-Type: application/json" http://${runtime-port}:${runtim
"src.secret.key": "${sk}",
"dest.topic": "${targetClusterTopic}",
"dest.access.key": "${ak}",
"max.task": "2",
"max.tasks": "2",
"src.topictags": "test1,*",
"src.acl.enable": "false",
"errors.tolerance": "all",
Expand Down Expand Up @@ -150,7 +150,7 @@ src.endpoint | String | Yes | namesrv address of source rocketmq cluster
src.topictags | String | Yes | source cluster topic and tag,${topic},{tag} | test1,* |
dest.topic | String | Yes | target cluster topic | test2 |
dest.endpoint | String | Yes | namesrv address of target rocketmq cluster | 127.0.0.1:9876 |
max.task | String | No | maximum number of tasks | 2 |
max.tasks | String | No | maximum number of tasks | 2 |
dest.acl.enable | String | No | acl switch,enumeration value : true/false | false |
dest.access.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | accesskey |
dest.secret.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | secretkey |
Expand Down
12 changes: 8 additions & 4 deletions connectors/rocketmq-replicator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,14 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-runtime</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.replicator.common.LoggerName;
import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
import org.apache.rocketmq.replicator.exception.InitMQClientException;
import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
Expand Down Expand Up @@ -108,11 +107,11 @@ public Class<? extends Task> taskClass() {

@Override
public void validate(KeyValue config) {
if (config.getInt(ConnectorConfig.MAX_TASK, 1) > 1) {
if (config.getInt(ReplicatorConnectorConfig.MAX_TASK, 1) > 1) {
log.warn("ReplicatorCheckpointConnector no need to set max-task, only used 1.");
}
// checkpoint just need only one task.
config.put(ConnectorConfig.MAX_TASK, 1);
config.put(ReplicatorConnectorConfig.MAX_TASK, 1);
ReplicatorUtils.checkNeedParams(ReplicatorCheckpointConnector.class.getName(), config, neededParamKeys);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.replicator.common.LoggerName;
import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
import org.apache.rocketmq.replicator.exception.InitMQClientException;
import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -189,7 +188,6 @@ public List<ConnectRecord> poll() throws InterruptedException {
return connectRecords;
}

@NotNull
private static Struct buildCheckpointPayload(String srcTopicWithInstanceId, String srcConsumerGroupWithInstanceId,
long minSrcLasttimestamp, long minDestLasttimestamp) {
Struct struct = new Struct(VALUE_SCHEMA_V0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
import org.apache.rocketmq.replicator.utils.ReplicatorUtils;

Expand All @@ -31,7 +30,7 @@
import java.util.List;
import java.util.Set;

import static org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME;
import static org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig.CONNECT_TOPICNAME;

/**
* @author osgoo
Expand Down Expand Up @@ -78,11 +77,11 @@ public Class<? extends Task> taskClass() {

@Override
public void validate(KeyValue config) {
if (config.getInt(ConnectorConfig.MAX_TASK) > 1) {
if (config.getInt(ReplicatorConnectorConfig.MAX_TASK) > 1) {
log.warn("ReplicatorHeartbeatConnector no need to set max-task, only used 1.");
}
// heartbeat just need only one task.
config.put(ConnectorConfig.MAX_TASK, 1);
config.put(ReplicatorConnectorConfig.MAX_TASK, 1);
if (StringUtils.isNotBlank(config.getString(CONNECT_TOPICNAME))) {
log.warn("ReplicatorHeartbeatConnector no need to set " + CONNECT_TOPICNAME + ", use " + ReplicatorConnectorConfig.DEST_TOPIC + " instead.");
// use destInstanceId % destTopic for sink instead of CONNECT_TOPICNAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.replicator.stats.ReplicatorTaskStats;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -135,7 +134,6 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
long storeTimestamp = messageExt.getStoreTimestamp();
long consumeTimestamp = System.currentTimeMillis();
long rt = consumeTimestamp - bornTimestamp;
ReplicatorTaskStats.incItemValue(ReplicatorTaskStats.REPLICATOR_HEARTBEAT_DELAY_MS, connectorConfig.getConnectorId(), (int) rt, 1);
log.info(messageExt.getUserProperty("src") + " --> " + messageExt.getUserProperty("dest") + " RT " + bornTimestamp + "," + storeTimestamp + "," + consumeTimestamp);
consumerLastConsumeOk = consumeTimestamp;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
Expand Down Expand Up @@ -201,7 +199,6 @@ public void start(KeyValue config) {
log.info("ReplicatorHeartbeatTask connectorConfig : " + connectorConfig);

try {
ReplicatorTaskStats.init();
// init consumer group
String destClusterName = connectorConfig.getDestCluster();
createAndUpdatePullConsumerGroup(destClusterName, consumeGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,11 @@
import org.apache.rocketmq.replicator.exception.GetMetaDataException;
import org.apache.rocketmq.replicator.exception.InitMQClientException;
import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.CONNECTOR_ID;
import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME;
import static org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig.CONNECT_TOPICNAME;


/**
* @author osgoo
Expand Down Expand Up @@ -193,8 +191,8 @@ public List<KeyValue> taskConfigs(int maxTasks) {
keyValue.put(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES, JSON.toJSONString(normalDivided.get(i)));

// CONNECTOR_ID is not fulfilled by rebalance
keyValue.put(CONNECTOR_ID, connectorConfig.getString(CONNECTOR_ID));
keyValue.put(ERRORS_TOLERANCE_CONFIG, connectorConfig.getString(ERRORS_TOLERANCE_CONFIG, ToleranceType.ALL.name()));
// keyValue.put(CONNECTOR_ID, connectorConfig.getString(CONNECTOR_ID));
// keyValue.put(ERRORS_TOLERANCE_CONFIG, connectorConfig.getString(ERRORS_TOLERANCE_CONFIG, ToleranceType.ALL.name()));
keyValue.put(ReplicatorConnectorConfig.SRC_CLOUD, connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLOUD));
keyValue.put(ReplicatorConnectorConfig.SRC_REGION, connectorConfig.getString(ReplicatorConnectorConfig.SRC_REGION));
keyValue.put(ReplicatorConnectorConfig.SRC_CLUSTER, connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLUSTER));
Expand Down Expand Up @@ -257,7 +255,6 @@ public Class<? extends Task> taskClass() {
add(ReplicatorConnectorConfig.DEST_ENDPOINT);
add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
add(ERRORS_TOLERANCE_CONFIG);
}
};

Expand Down
Loading

0 comments on commit 85d9c03

Please sign in to comment.