From 85d9c0366b3bff26e95b6feefdb585603d4938b4 Mon Sep 17 00:00:00 2001
From: zhoubo <877036922@qq.com>
Date: Wed, 17 May 2023 16:37:56 +0800
Subject: [PATCH] [ISSUES #509] Replicator remove runtime dependency (#510)
* replicator remove runtime dependency
* change max.task to max.tasks
* task config add connector.class
* fix test case error
---
.../rocketmq-connect-debezium-mysql/README.md | 4 +-
.../README.md | 4 +-
connectors/rocketmq-connect-doris/README.txt | 4 +-
.../README.md | 2 +-
connectors/rocketmq-replicator/README.md | 4 +-
connectors/rocketmq-replicator/pom.xml | 12 ++-
.../ReplicatorCheckpointConnector.java | 7 +-
.../replicator/ReplicatorCheckpointTask.java | 4 +-
.../ReplicatorHeartbeatConnector.java | 7 +-
.../replicator/ReplicatorHeartbeatTask.java | 3 -
.../replicator/ReplicatorSourceConnector.java | 11 +--
.../replicator/ReplicatorSourceTask.java | 40 ++-------
.../replicator/common/LoggerName.java | 27 ++++++
.../config/ReplicatorConnectorConfig.java | 7 +-
.../replicator/stats/ReplicatorTaskStats.java | 82 -------------------
.../replicator/utils/ReplicatorUtils.java | 16 +++-
.../AbstractConfigManagementService.java | 1 +
.../ConnectKeyValueDeserializerTest.java | 4 +-
.../store/ConnectKeyValueSerializerTest.java | 2 +-
19 files changed, 85 insertions(+), 156 deletions(-)
create mode 100644 connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/LoggerName.java
delete mode 100644 connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/stats/ReplicatorTaskStats.java
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/README.md b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/README.md
index 3997b2d5a..8a0ac7a9d 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/README.md
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/README.md
@@ -175,7 +175,7 @@ PRIMARY KEY (`id`)
```
curl-X POST-H"Content-Type: application/json"http: //127.0.0.1:8082/connectors/MySQLCDCSource'{
"connector.class": "org.apache.rocketmq.connect.debezium.MySQL.DebeziumMySQLConnector",
-"max.task": "1",
+"max.tasks": "1",
"connect.topicname": "debezium-MySQL-source-topic",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
@@ -209,7 +209,7 @@ curl-X POST-H"Content-Type: application/json"http: //127.0.0.1:8082/connectors/M
```
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest -d '{
"connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
- "max.task": "2",
+ "max.tasks": "2",
"connect.topicnames": "debezium-mysql-source",
"connection.url": "jdbc:mysql://数据库ip:3306/inventory_2",
"connection.user": "root",
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/README.md b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/README.md
index e0f7cb470..97fc188d1 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/README.md
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/README.md
@@ -175,7 +175,7 @@ CREATE TABLE holding (
```
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/postgres-connector -d '{
"connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
- "max.task": "1",
+ "max.tasks": "1",
"connect.topicname": "debezium-postgres-source-01",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
@@ -202,7 +202,7 @@ curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connector
```
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{
"connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
- "max.task": "2",
+ "max.tasks": "2",
"connect.topicnames": "debezium-postgres-source-01",
"connection.url": "jdbc:mysql://数据库ip:3306/bank1",
"connection.user": "root",
diff --git a/connectors/rocketmq-connect-doris/README.txt b/connectors/rocketmq-connect-doris/README.txt
index 75025bb9b..3b530e116 100644
--- a/connectors/rocketmq-connect-doris/README.txt
+++ b/connectors/rocketmq-connect-doris/README.txt
@@ -40,7 +40,7 @@ Transaction concept not different in Mysql and Doris, so TCL is not supported.
POST http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-sink-connector-name}
{
"connector.class":"org.apache.rocketmq.connect.doris.connector.DorisSinkConnector",
- "max.task":"1",
+ "max.tasks":"1",
"table.whitelist":"sink_test.doris_test_sink",
"connect.topicnames":"doris_test_sink",
"host":"xx.xx.xx.xx",
@@ -75,7 +75,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-doris-connector-name}
|port | String | YES |doris http port | 8030 |
|user | String | YES |doris user name | root |
|passwd | String | YES |doris passwd | passwd |
-|max.task | Integer | NO |task number | 2 |
+|max.tasks | Integer | NO |task number | 2 |
|key.converter | String | YES |data converter | org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter |
|value.converter | String | YES |value converter | org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter |
```
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/README.md b/connectors/rocketmq-connect-kafka-connector-adapter/README.md
index 98522f876..f60cbceed 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/README.md
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/README.md
@@ -14,7 +14,7 @@ rocketmq connect runtime参数:
如果是SinkConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector。
- **connector.class**: 要导入导出数据的rocketmq topic
-- **max.task**: 启动的task数目
+- **max.tasks**: 启动的task数目
kafka-connector参数放在kafka.connector.configs里,又分为2类:kafka-connector-adapter参数,以及 具体kafka connector
diff --git a/connectors/rocketmq-replicator/README.md b/connectors/rocketmq-replicator/README.md
index 21d64945a..a08482906 100644
--- a/connectors/rocketmq-replicator/README.md
+++ b/connectors/rocketmq-replicator/README.md
@@ -38,7 +38,7 @@ curl -X POST -H "Content-Type: application/json" http://${runtime-port}:${runtim
"src.secret.key": "${sk}",
"dest.topic": "${targetClusterTopic}",
"dest.access.key": "${ak}",
- "max.task": "2",
+ "max.tasks": "2",
"src.topictags": "test1,*",
"src.acl.enable": "false",
"errors.tolerance": "all",
@@ -150,7 +150,7 @@ src.endpoint | String | Yes | namesrv address of source rocketmq cluster
src.topictags | String | Yes | source cluster topic and tag,${topic},{tag} | test1,* |
dest.topic | String | Yes | target cluster topic | test2 |
dest.endpoint | String | Yes | namesrv address of target rocketmq cluster | 127.0.0.1:9876 |
-max.task | String | No | maximum number of tasks | 2 |
+max.tasks | String | No | maximum number of tasks | 2 |
dest.acl.enable | String | No | acl switch,enumeration value : true/false | false |
dest.access.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | accesskey |
dest.secret.key | String | No | please refer to the RocketMQ ACL module,when dest.acl.enable is false, this parameter does not take effect | secretkey |
diff --git a/connectors/rocketmq-replicator/pom.xml b/connectors/rocketmq-replicator/pom.xml
index 579df94f4..8e893b04a 100644
--- a/connectors/rocketmq-replicator/pom.xml
+++ b/connectors/rocketmq-replicator/pom.xml
@@ -145,10 +145,14 @@
compile
- org.apache.rocketmq
- rocketmq-connect-runtime
- 0.0.1-SNAPSHOT
- compile
+ org.apache.commons
+ commons-lang3
+ 3.12.0
+
+
+ org.apache.commons
+ commons-collections4
+ 4.4
diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
index 2b7f77728..60b68c733 100644
--- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
+++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java
@@ -29,9 +29,8 @@
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.replicator.common.LoggerName;
import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
import org.apache.rocketmq.replicator.exception.InitMQClientException;
import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
@@ -108,11 +107,11 @@ public Class extends Task> taskClass() {
@Override
public void validate(KeyValue config) {
- if (config.getInt(ConnectorConfig.MAX_TASK, 1) > 1) {
+ if (config.getInt(ReplicatorConnectorConfig.MAX_TASK, 1) > 1) {
log.warn("ReplicatorCheckpointConnector no need to set max-task, only used 1.");
}
// checkpoint just need only one task.
- config.put(ConnectorConfig.MAX_TASK, 1);
+ config.put(ReplicatorConnectorConfig.MAX_TASK, 1);
ReplicatorUtils.checkNeedParams(ReplicatorCheckpointConnector.class.getName(), config, neededParamKeys);
}
diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
index 5a44ec614..b2d4b9702 100644
--- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
+++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
@@ -34,14 +34,13 @@
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.replicator.common.LoggerName;
import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
import org.apache.rocketmq.replicator.exception.InitMQClientException;
import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -189,7 +188,6 @@ public List poll() throws InterruptedException {
return connectRecords;
}
- @NotNull
private static Struct buildCheckpointPayload(String srcTopicWithInstanceId, String srcConsumerGroupWithInstanceId,
long minSrcLasttimestamp, long minDestLasttimestamp) {
Struct struct = new Struct(VALUE_SCHEMA_V0);
diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java
index ad54f0847..6e7f0d93d 100644
--- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java
+++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java
@@ -22,7 +22,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
@@ -31,7 +30,7 @@
import java.util.List;
import java.util.Set;
-import static org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME;
+import static org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig.CONNECT_TOPICNAME;
/**
* @author osgoo
@@ -78,11 +77,11 @@ public Class extends Task> taskClass() {
@Override
public void validate(KeyValue config) {
- if (config.getInt(ConnectorConfig.MAX_TASK) > 1) {
+ if (config.getInt(ReplicatorConnectorConfig.MAX_TASK) > 1) {
log.warn("ReplicatorHeartbeatConnector no need to set max-task, only used 1.");
}
// heartbeat just need only one task.
- config.put(ConnectorConfig.MAX_TASK, 1);
+ config.put(ReplicatorConnectorConfig.MAX_TASK, 1);
if (StringUtils.isNotBlank(config.getString(CONNECT_TOPICNAME))) {
log.warn("ReplicatorHeartbeatConnector no need to set " + CONNECT_TOPICNAME + ", use " + ReplicatorConnectorConfig.DEST_TOPIC + " instead.");
// use destInstanceId % destTopic for sink instead of CONNECT_TOPICNAME
diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatTask.java
index 46ff63d2a..920bd00e0 100644
--- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatTask.java
+++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatTask.java
@@ -39,7 +39,6 @@
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.replicator.stats.ReplicatorTaskStats;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.slf4j.Logger;
@@ -135,7 +134,6 @@ public ConsumeConcurrentlyStatus consumeMessage(List list,
long storeTimestamp = messageExt.getStoreTimestamp();
long consumeTimestamp = System.currentTimeMillis();
long rt = consumeTimestamp - bornTimestamp;
- ReplicatorTaskStats.incItemValue(ReplicatorTaskStats.REPLICATOR_HEARTBEAT_DELAY_MS, connectorConfig.getConnectorId(), (int) rt, 1);
log.info(messageExt.getUserProperty("src") + " --> " + messageExt.getUserProperty("dest") + " RT " + bornTimestamp + "," + storeTimestamp + "," + consumeTimestamp);
consumerLastConsumeOk = consumeTimestamp;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
@@ -201,7 +199,6 @@ public void start(KeyValue config) {
log.info("ReplicatorHeartbeatTask connectorConfig : " + connectorConfig);
try {
- ReplicatorTaskStats.init();
// init consumer group
String destClusterName = connectorConfig.getDestCluster();
createAndUpdatePullConsumerGroup(destClusterName, consumeGroup);
diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
index 04b4ca8a5..9854a112f 100644
--- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
+++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
@@ -50,13 +50,11 @@
import org.apache.rocketmq.replicator.exception.GetMetaDataException;
import org.apache.rocketmq.replicator.exception.InitMQClientException;
import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
-import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.CONNECTOR_ID;
-import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.ERRORS_TOLERANCE_CONFIG;
-import static org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME;
+import static org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig.CONNECT_TOPICNAME;
+
/**
* @author osgoo
@@ -193,8 +191,8 @@ public List taskConfigs(int maxTasks) {
keyValue.put(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES, JSON.toJSONString(normalDivided.get(i)));
// CONNECTOR_ID is not fulfilled by rebalance
- keyValue.put(CONNECTOR_ID, connectorConfig.getString(CONNECTOR_ID));
- keyValue.put(ERRORS_TOLERANCE_CONFIG, connectorConfig.getString(ERRORS_TOLERANCE_CONFIG, ToleranceType.ALL.name()));
+// keyValue.put(CONNECTOR_ID, connectorConfig.getString(CONNECTOR_ID));
+// keyValue.put(ERRORS_TOLERANCE_CONFIG, connectorConfig.getString(ERRORS_TOLERANCE_CONFIG, ToleranceType.ALL.name()));
keyValue.put(ReplicatorConnectorConfig.SRC_CLOUD, connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLOUD));
keyValue.put(ReplicatorConnectorConfig.SRC_REGION, connectorConfig.getString(ReplicatorConnectorConfig.SRC_REGION));
keyValue.put(ReplicatorConnectorConfig.SRC_CLUSTER, connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLUSTER));
@@ -257,7 +255,6 @@ public Class extends Task> taskClass() {
add(ReplicatorConnectorConfig.DEST_ENDPOINT);
add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
- add(ERRORS_TOLERANCE_CONFIG);
}
};
diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index f20cba48f..2d53cc282 100644
--- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -37,20 +37,17 @@
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
-import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.replicator.common.LoggerName;
import org.apache.rocketmq.replicator.config.ConsumeFromWhere;
import org.apache.rocketmq.replicator.config.FailoverStrategy;
import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
import org.apache.rocketmq.replicator.context.UnAckMessage;
import org.apache.rocketmq.replicator.exception.StartTaskException;
-import org.apache.rocketmq.replicator.stats.ReplicatorTaskStats;
import org.apache.rocketmq.replicator.stats.TpsLimiter;
import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -65,10 +62,8 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET;
-import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.TOPIC;
+import static org.apache.rocketmq.replicator.utils.ReplicatorUtils.QUEUE_OFFSET;
/**
* @author osgoo
@@ -366,13 +361,9 @@ public void accept(MessageQueue messageQueue, OffsetWrapper offsetWrapper) {
List delayMsKeys = new ArrayList<>();
String normalNumKey = connectorConfig.getConnectorId();
delayNumsKeys.add(normalNumKey);
- ReplicatorTaskStats.incItemValue(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_NUMS, normalNumKey, (int) normalDelayCount.get(), 1);
String normalMsKey = connectorConfig.getConnectorId();
delayMsKeys.add(normalMsKey);
- ReplicatorTaskStats.incItemValue(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_MS, normalMsKey, (int) normalDelayMs.get(), 1);
- metricsItem2KeyMap.put(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_NUMS, delayNumsKeys);
- metricsItem2KeyMap.put(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_MS, delayMsKeys);
} catch (RemotingException | MQClientException e) {
log.error("occur remoting or mqclient exception, retry build mqadminclient", e);
try {
@@ -510,10 +501,8 @@ private ConnectRecord convertToSinkDataEntry(MessageExt message) {
Long timestamp;
ConnectRecord sinkDataEntry = null;
- String connectTimestamp = properties.get(ConnectorConfig.CONNECT_TIMESTAMP);
+ String connectTimestamp = properties.get(ReplicatorConnectorConfig.CONNECT_TIMESTAMP);
timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.parseLong(connectTimestamp) : System.currentTimeMillis();
-// String connectSchema = properties.get(ConnectorConfig.CONNECT_SCHEMA);
-// schema = StringUtils.isNotEmpty(connectSchema) ? JSON.parseObject(connectSchema, Schema.class) : null;
Schema schema = SchemaBuilder.string().build();
byte[] body = message.getBody();
String destTopic = swapTopic(topic);
@@ -524,8 +513,8 @@ private ConnectRecord convertToSinkDataEntry(MessageExt message) {
log.error("swap topic got null, topic : " + topic);
}
}
- RecordPartition recordPartition = ConnectUtil.convertToRecordPartition(topic, message.getBrokerName(), message.getQueueId());
- RecordOffset recordOffset = ConnectUtil.convertToRecordOffset(message.getQueueOffset());
+ RecordPartition recordPartition = ReplicatorUtils.convertToRecordPartition(topic, message.getBrokerName(), message.getQueueId());
+ RecordOffset recordOffset = ReplicatorUtils.convertToRecordOffset(message.getQueueOffset());
String bodyStr = new String(body, StandardCharsets.UTF_8);
sinkDataEntry = new ConnectRecord(recordPartition, recordOffset, timestamp, schema, bodyStr);
KeyValue keyValue = new DefaultKeyValue();
@@ -576,7 +565,7 @@ private ConnectRecord convertToSinkDataEntry(MessageExt message) {
keyValue.put(REPLICATOR_SRC_MESSAGE_ID, message.getMsgId());
log.debug("addExtension : " + keyValue.keySet());
sinkDataEntry.addExtension(keyValue);
- sinkDataEntry.addExtension(TOPIC, destTopic);
+ sinkDataEntry.addExtension(ReplicatorUtils.TOPIC, destTopic);
return sinkDataEntry;
}
@@ -722,8 +711,6 @@ public void start(KeyValue config) {
createAndUpdatePullConsumerGroup(srcClusterName, pullConsumerGroup);
}
log.info("createAndUpdatePullConsumerGroup " + pullConsumerGroup + " finished.");
- ReplicatorTaskStats.init();
- log.info("TaskStats init.");
// init converter
// init pullConsumer
buildConsumer();
@@ -745,20 +732,6 @@ public void start(KeyValue config) {
}
- private void cleanMetricsMonitor() {
- metricsItem2KeyMap.forEach(new BiConsumer>() {
- @Override
- public void accept(String itemName, List itemKeys) {
- itemKeys.forEach(new Consumer() {
- @Override
- public void accept(String itemKey) {
- ReplicatorTaskStats.getConnectStatsManager().removeAdditionalItem(itemName, itemKey);
- }
- });
- }
- });
- }
-
private void cleanResource() {
try {
if (pullConsumer != null) {
@@ -767,7 +740,6 @@ private void cleanResource() {
if (metricsMonitorExecutorService != null) {
metricsMonitorExecutorService.shutdown();
}
- cleanMetricsMonitor();
} catch (Exception e) {
log.error("clean resource error,", e);
}
diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/LoggerName.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/LoggerName.java
new file mode 100644
index 000000000..8091bb2a8
--- /dev/null
+++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/LoggerName.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.replicator.common;
+
+/**
+ * Define all the logger name of the runtime.
+ */
+public class LoggerName {
+ public static final String CONNECT_BUG = "ConnectBug";
+ public static final String REPLICATRO_RUNTIME = "ReplicatorRuntime";
+ public static final String WORKER_ERROR_MSG_ID = "WorkerErrorMsgId";
+}
diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
index 6afdd7564..4d605464f 100644
--- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
+++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
@@ -157,10 +157,15 @@ public class ReplicatorConnectorConfig {
public final static String DIVIDED_RETRY_QUEUES = "divided.retryqueues";
public final static String DIVIDED_DLQ_QUEUES = "divided.dlqqueues";
public final static String SYNC_TPS = "sync.tps";
- public final static String MAX_TASK = "max.task";
+ public final static String MAX_TASK = "max.tasks";
public final static String COMMIT_OFFSET_INTERVALS_MS = "commit.offset.interval.ms";
public final static String REQUEST_TASK_RECONFIG_INTERVAL_MS = "request.task.reconfig.ms";
+ public static final String CONNECT_TIMESTAMP = "connect.timestamp";
+
+ public static final String CONNECT_TOPICNAME = "connect.topicname";
+
+
public String getTaskId() {
return taskId;
}
diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/stats/ReplicatorTaskStats.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/stats/ReplicatorTaskStats.java
deleted file mode 100644
index 4aef1c8e2..000000000
--- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/stats/ReplicatorTaskStats.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.replicator.stats;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * @author osgoo
- * @date 2022/6/22
- * depends on ConnectStatsManager
- */
-public class ReplicatorTaskStats {
- // todo log 输出到 rocketmq_client.log
- private static final Log log = LogFactory.getLog(ReplicatorTaskStats.class);
- // additional item
- private static boolean additionalStatsEnable = false;
- // statsItems
- public static final String REPLICATOR_SOURCE_TASK_DELAY_NUMS = "REPLICATOR_SOURCE_TASK_DELAY_NUMS";
- public static final String REPLICATOR_SOURCE_TASK_DELAY_MS = "REPLICATOR_SOURCE_TASK_DELAY_MS";
-
- // todo p2p delay, src born timestamp ---> sink dest timestamp
- // todo rpo 计算规则
- public static final String REPLICATOR_HEARTBEAT_DELAY_MS = "REPLICATOR_HEARTBEAT_DELAY_MS";
-
- public static List additionalItems = new ArrayList<>();
- static {
- additionalItems.add(REPLICATOR_SOURCE_TASK_DELAY_NUMS);
- additionalItems.add(REPLICATOR_SOURCE_TASK_DELAY_MS);
- additionalItems.add(REPLICATOR_HEARTBEAT_DELAY_MS);
- }
- private static ConnectStatsManager connectStatsManager;
-
- public static synchronized void init() {
- // init statsItem
- connectStatsManager = new ConnectStatsManager(UUID.randomUUID().toString());
- connectStatsManager.initAdditionalItems(ReplicatorTaskStats.additionalItems);
- additionalStatsEnable = true;
- log.info("Replicator added additional items.");
- }
-
- public static ConnectStatsManager getConnectStatsManager() {
- return connectStatsManager;
- }
-
- public static void incItemValue(String itemName, String key, int incValue, int incTimes) {
- if (!additionalStatsEnable || connectStatsManager == null || itemName == null || key == null) {
- log.warn("Replicator stats not enabled. connectStatsManager : " + connectStatsManager + ", itemName : " + itemName + ", key : " + key);
- return;
- }
- connectStatsManager.incAdditionalItem(itemName, key, incValue, incTimes);
- }
-
- public static void removeItem(String itemName, String key) {
- if (!additionalStatsEnable || connectStatsManager == null || itemName == null || key == null) {
- log.warn("Replicator stats not enabled. connectStatsManager : " + connectStatsManager + ", itemName : " + itemName + ", key : " + key);
- return;
- }
- connectStatsManager.removeAdditionalItem(itemName, key);
- }
-
-}
diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
index fd28a9b73..28c627269 100644
--- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
+++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
@@ -24,7 +24,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.replicator.common.LoggerName;
import org.apache.rocketmq.replicator.exception.ParamInvalidException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
@@ -38,7 +38,7 @@
import java.util.Map;
import java.util.Set;
-import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET;
+
public class ReplicatorUtils {
private static final Logger log = LoggerFactory.getLogger(LoggerName.REPLICATRO_RUNTIME);
@@ -48,6 +48,8 @@ public class ReplicatorUtils {
public static final String UPSTREAM_LASTTIMESTAMP_KEY = "upstreamLastTimestamp";
public static final String DOWNSTREAM_LASTTIMESTAMP_KEY = "downstreamLastTimestamp";
public static final String METADATA_KEY = "metadata";
+ public static final String QUEUE_OFFSET = "queueOffset";
+ public static final String TOPIC = "topic";
public static String buildTopicWithNamespace(String topic, String instanceId) {
if (StringUtils.isBlank(instanceId)) {
@@ -91,6 +93,15 @@ public static RecordPartition convertToRecordPartition(String topic, String cons
return recordPartition;
}
+ public static RecordPartition convertToRecordPartition(String topic, String brokerName, int queueId) {
+ Map map = new HashMap<>();
+ map.put("topic", topic);
+ map.put("brokerName", brokerName);
+ map.put("queueId", queueId + "");
+ RecordPartition recordPartition = new RecordPartition(map);
+ return recordPartition;
+ }
+
public static RecordOffset convertToRecordOffset(Long offset) {
Map offsetMap = new HashMap<>();
offsetMap.put(QUEUE_OFFSET, offset + "");
@@ -114,4 +125,5 @@ public static void createTopic(DefaultMQAdminExt defaultMQAdminExt, TopicConfig
throw new RuntimeException("Create topic [" + topicConfig.getTopicName() + "] failed", e);
}
}
+
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
index 4e312e29b..855c08248 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
@@ -334,6 +334,7 @@ public void recomputeTaskConfigs(String connectorName, ConnectKeyValue configs)
// put task id
newKeyValue.put(ConnectorConfig.TASK_ID, taskId);
newKeyValue.put(ConnectorConfig.TASK_CLASS, connector.taskClass().getName());
+ newKeyValue.put(ConnectorConfig.CONNECTOR_CLASS, connector.getClass().getName());
// source topic
if (configs.containsKey(SourceConnectorConfig.CONNECT_TOPICNAME)) {
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializerTest.java
index 0536c549b..9535734c8 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializerTest.java
@@ -29,9 +29,9 @@ public void deserializeTest() {
ConnectKeyValueDeserializer deserializer = new ConnectKeyValueDeserializer();
ConnectKeyValue connectKeyValue = new ConnectKeyValue();
connectKeyValue.put("connector.topic", "testTopic");
- connectKeyValue.put("max.task", 2);
+ connectKeyValue.put("max.tasks", 2);
final ConnectKeyValue result = deserializer.deserialize("testTopic", JSON.toJSONBytes(connectKeyValue));
- Assert.assertEquals(2, result.getInt("max.task"));
+ Assert.assertEquals(2, result.getInt("max.tasks"));
Assert.assertEquals("testTopic", result.getString("connector.topic"));
}
}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializerTest.java
index c0f0ffa13..3acba0213 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializerTest.java
@@ -29,7 +29,7 @@ public void serializeTest() {
ConnectKeyValueSerializer serializer = new ConnectKeyValueSerializer();
ConnectKeyValue connectKeyValue = new ConnectKeyValue();
connectKeyValue.put("connect.topic", "testTopic");
- connectKeyValue.put("max.task", 2);
+ connectKeyValue.put("max.tasks", 2);
final byte[] result = serializer.serialize("testTopic", connectKeyValue);
Assert.assertEquals(new String(JSON.toJSONBytes(connectKeyValue)), new String(result));
}