From 8c3de96159e7d476f780cb0ab295859851379647 Mon Sep 17 00:00:00 2001 From: doubleDimple Date: Fri, 25 Mar 2022 22:59:35 +0800 Subject: [PATCH 1/5] support RedisSourceConnector --- .../connect/redis/connector/RedisSourceConnector.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceConnector.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceConnector.java index aadc585c..6da4461c 100644 --- a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceConnector.java +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceConnector.java @@ -22,6 +22,7 @@ import io.openmessaging.KeyValue; import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.source.SourceConnector; +import org.apache.commons.lang.StringUtils; import org.apache.rocketmq.connect.redis.common.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +36,9 @@ public class RedisSourceConnector extends SourceConnector { @Override public String verifyAndSetConfig(KeyValue keyValue) { this.keyValue = keyValue; String msg = Config.checkConfig(keyValue); - if (msg != null) { - return msg; + if (StringUtils.isNotBlank(msg)){ + LOGGER.error("verify error:[{}]",msg); + throw new RuntimeException("verify error"); } return null; } From 9158245d067eebdacb13eedc6c397bf2c2a2f58c Mon Sep 17 00:00:00 2001 From: doubleDimple Date: Tue, 29 Mar 2022 18:12:40 +0800 Subject: [PATCH 2/5] support redis sink --- .../redis/{common => config}/Config.java | 4 +- .../redis/connector/RedisSinkConnector.java | 65 ++++++++++++++++ .../redis/connector/RedisSinkTask.java | 77 +++++++++++++++++++ .../redis/connector/RedisSourceConnector.java | 8 +- .../redis/connector/RedisSourceTask.java | 2 +- .../handler/DefaultRedisEventHandler.java | 2 +- .../processor/DefaultRedisEventProcessor.java | 2 +- .../redis/processor/RedisEventListener.java | 2 +- .../redis/test/common/ConfigTest.java | 2 +- .../converter/RedisPositionConverterTest.java | 2 +- .../test/handler/RedisEventHandlerTest.java | 2 +- .../redis/test/processor/ListenerTest.java | 2 +- .../redis/test/processor/ProcessorTest.java | 2 +- 13 files changed, 157 insertions(+), 15 deletions(-) rename connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/{common => config}/Config.java (97%) create mode 100644 connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkConnector.java create mode 100644 connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/common/Config.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/config/Config.java similarity index 97% rename from connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/common/Config.java rename to connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/config/Config.java index fe410755..0c82779f 100644 --- a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/common/Config.java +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/config/Config.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.redis.common; +package org.apache.rocketmq.connect.redis.config; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -26,6 +26,8 @@ import com.moilioncircle.redis.replicator.RedisURI; import org.apache.commons.lang.StringUtils; +import org.apache.rocketmq.connect.redis.common.RedisConstants; +import org.apache.rocketmq.connect.redis.common.SyncMod; import org.apache.rocketmq.connect.redis.util.PropertyToObjectUtils; import io.openmessaging.KeyValue; import org.slf4j.Logger; diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkConnector.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkConnector.java new file mode 100644 index 00000000..e02e3f0a --- /dev/null +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkConnector.java @@ -0,0 +1,65 @@ +package org.apache.rocketmq.connect.redis.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; +import io.openmessaging.connector.api.sink.SinkConnector; +import org.apache.rocketmq.connect.redis.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + +/** + * author: doubleDimple + */ +public class RedisSinkConnector extends SinkConnector { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkConnector.class); + + private volatile boolean configValid = false; + private volatile boolean adminStarted; + private KeyValue keyValue; + + @Override + public String verifyAndSetConfig(KeyValue config) { + this.keyValue = config; + String msg = Config.checkConfig(keyValue); + if (msg != null) { + return msg; + } + this.configValid = true; + return null; + } + + @Override + public void start() { + LOGGER.info("the redisSinkConnector is start..."); + } + + @Override + public void stop() { + + } + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } + + @Override + public Class taskClass() { + return RedisSinkTask.class; + } + + @Override + public List taskConfigs() { + List keyValues = new ArrayList<>(); + keyValues.add(this.keyValue); + return keyValues; + } +} diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java new file mode 100644 index 00000000..bd41767c --- /dev/null +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java @@ -0,0 +1,77 @@ +package org.apache.rocketmq.connect.redis.connector; + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.common.QueueMetaData; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SinkDataEntry; +import io.openmessaging.connector.api.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * author doubleDimple + */ +public class RedisSinkTask extends SinkTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkTask.class); + + @Override + public void put(Collection sinkDataEntries) { + //save data from MQ to redis + for (SinkDataEntry record : sinkDataEntries) { + Map fieldMap = new HashMap<>(); + Object[] payloads = record.getPayload(); + Schema schema = record.getSchema(); + //判断schema的datasource,是不是redis,如果是redis,按照redis命令存储, + //如果不是redis,则直接将数据存储到redis的集合中 + List fields = schema.getFields(); + Boolean parseError = false; + if (!fields.isEmpty()) { + for (Field field : fields) { + Object fieldValue = payloads[field.getIndex()]; + Object[] value = JSONObject.parseArray((String)fieldValue).toArray(); + if (value.length == 2) { + fieldMap.put(field, value); + } else { + LOGGER.error("parseArray error, fieldValue:{}", fieldValue); + parseError = true; + } + } + } + if (!parseError) { + + } + } + } + + @Override + public void commit(Map offsets) { + + } + + @Override + public void start(KeyValue config) { + + } + + @Override + public void stop() { + + } + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } +} diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceConnector.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceConnector.java index 6da4461c..3e8b9831 100644 --- a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceConnector.java +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceConnector.java @@ -22,8 +22,7 @@ import io.openmessaging.KeyValue; import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.source.SourceConnector; -import org.apache.commons.lang.StringUtils; -import org.apache.rocketmq.connect.redis.common.Config; +import org.apache.rocketmq.connect.redis.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,9 +35,8 @@ public class RedisSourceConnector extends SourceConnector { @Override public String verifyAndSetConfig(KeyValue keyValue) { this.keyValue = keyValue; String msg = Config.checkConfig(keyValue); - if (StringUtils.isNotBlank(msg)){ - LOGGER.error("verify error:[{}]",msg); - throw new RuntimeException("verify error"); + if (msg != null) { + return msg; } return null; } diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceTask.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceTask.java index 1b978367..66a78362 100644 --- a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceTask.java +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceTask.java @@ -25,7 +25,7 @@ import io.openmessaging.KeyValue; import io.openmessaging.connector.api.data.SourceDataEntry; import io.openmessaging.connector.api.source.SourceTask; -import org.apache.rocketmq.connect.redis.common.Config; +import org.apache.rocketmq.connect.redis.config.Config; import org.apache.rocketmq.connect.redis.common.Options; import org.apache.rocketmq.connect.redis.converter.KVEntryConverter; import org.apache.rocketmq.connect.redis.converter.RedisEntryConverter; diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/handler/DefaultRedisEventHandler.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/handler/DefaultRedisEventHandler.java index 14362b9b..de346f8b 100644 --- a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/handler/DefaultRedisEventHandler.java +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/handler/DefaultRedisEventHandler.java @@ -23,7 +23,7 @@ import com.moilioncircle.redis.replicator.event.Event; import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; import com.moilioncircle.redis.replicator.rdb.iterable.datatype.BatchedKeyValuePair; -import org.apache.rocketmq.connect.redis.common.Config; +import org.apache.rocketmq.connect.redis.config.Config; import org.apache.rocketmq.connect.redis.common.SyncMod; import org.apache.rocketmq.connect.redis.parser.DefaultRedisRdbParser; import org.apache.rocketmq.connect.redis.parser.RedisRdbParser; diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/processor/DefaultRedisEventProcessor.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/processor/DefaultRedisEventProcessor.java index cde43671..e03d9523 100644 --- a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/processor/DefaultRedisEventProcessor.java +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/processor/DefaultRedisEventProcessor.java @@ -40,7 +40,7 @@ import com.moilioncircle.redis.replicator.Replicator; import com.moilioncircle.redis.replicator.event.EventListener; import org.apache.commons.lang.StringUtils; -import org.apache.rocketmq.connect.redis.common.Config; +import org.apache.rocketmq.connect.redis.config.Config; import org.apache.rocketmq.connect.redis.common.RedisConstants; import org.apache.rocketmq.connect.redis.common.SyncMod; import org.apache.rocketmq.connect.redis.handler.RedisEventHandler; diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/processor/RedisEventListener.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/processor/RedisEventListener.java index 57d5aa84..0200b752 100644 --- a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/processor/RedisEventListener.java +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/processor/RedisEventListener.java @@ -25,7 +25,7 @@ import com.moilioncircle.redis.replicator.event.PreRdbSyncEvent; import com.moilioncircle.redis.replicator.rdb.datatype.AuxField; import java.io.IOException; -import org.apache.rocketmq.connect.redis.common.Config; +import org.apache.rocketmq.connect.redis.config.Config; import org.apache.rocketmq.connect.redis.pojo.RedisEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/ConfigTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/ConfigTest.java index e63f45fd..8866e874 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/ConfigTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/ConfigTest.java @@ -6,7 +6,7 @@ import com.moilioncircle.redis.replicator.RedisURI; import io.openmessaging.KeyValue; import io.openmessaging.internal.DefaultKeyValue; -import org.apache.rocketmq.connect.redis.common.Config; +import org.apache.rocketmq.connect.redis.config.Config; import org.apache.rocketmq.connect.redis.common.SyncMod; import org.junit.Assert; import org.junit.Test; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/converter/RedisPositionConverterTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/converter/RedisPositionConverterTest.java index 03986d22..389d8232 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/converter/RedisPositionConverterTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/converter/RedisPositionConverterTest.java @@ -19,7 +19,7 @@ import com.alibaba.fastjson.JSONObject; import java.nio.ByteBuffer; -import org.apache.rocketmq.connect.redis.common.Config; +import org.apache.rocketmq.connect.redis.config.Config; import org.apache.rocketmq.connect.redis.common.RedisConstants; import org.apache.rocketmq.connect.redis.converter.RedisPositionConverter; import org.junit.Assert; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/handler/RedisEventHandlerTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/handler/RedisEventHandlerTest.java index 068187f8..2ddcb25c 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/handler/RedisEventHandlerTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/handler/RedisEventHandlerTest.java @@ -18,7 +18,7 @@ import com.moilioncircle.redis.replicator.rdb.iterable.datatype.BatchedKeyStringValueString; import com.moilioncircle.redis.replicator.rdb.iterable.datatype.BatchedKeyValuePair; import io.openmessaging.connector.api.data.FieldType; -import org.apache.rocketmq.connect.redis.common.Config; +import org.apache.rocketmq.connect.redis.config.Config; import org.apache.rocketmq.connect.redis.common.SyncMod; import org.apache.rocketmq.connect.redis.handler.DefaultRedisEventHandler; import org.apache.rocketmq.connect.redis.handler.RedisEventHandler; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ListenerTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ListenerTest.java index 6500eba9..ee9b8d61 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ListenerTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ListenerTest.java @@ -6,7 +6,7 @@ import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; import java.io.IOException; -import org.apache.rocketmq.connect.redis.common.Config; +import org.apache.rocketmq.connect.redis.config.Config; import org.apache.rocketmq.connect.redis.handler.DefaultRedisEventHandler; import org.apache.rocketmq.connect.redis.handler.RedisEventHandler; import org.apache.rocketmq.connect.redis.processor.DefaultRedisEventProcessor; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ProcessorTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ProcessorTest.java index 1410b2a2..8472f261 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ProcessorTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ProcessorTest.java @@ -7,7 +7,7 @@ import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet; import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; -import org.apache.rocketmq.connect.redis.common.Config; +import org.apache.rocketmq.connect.redis.config.Config; import org.apache.rocketmq.connect.redis.common.SyncMod; import org.apache.rocketmq.connect.redis.handler.DefaultRedisEventHandler; import org.apache.rocketmq.connect.redis.handler.RedisEventHandler; From 835906328a45fe39257bb652fde07456ca108aa6 Mon Sep 17 00:00:00 2001 From: doubleDimple Date: Tue, 29 Mar 2022 21:41:14 +0800 Subject: [PATCH 3/5] support RedisSourceConnector --- .../redis/connector/RedisSinkConnector.java | 17 +++++++++++++++++ .../connect/redis/connector/RedisSinkTask.java | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkConnector.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkConnector.java index e02e3f0a..43da131f 100644 --- a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkConnector.java +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkConnector.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.connect.redis.connector; import io.openmessaging.KeyValue; diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java index bd41767c..d9a87505 100644 --- a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.connect.redis.connector; import com.alibaba.fastjson.JSONObject; From 20ab758300454cfff66a4e70ae73d31996ddf201 Mon Sep 17 00:00:00 2001 From: doubleDimple Date: Wed, 30 Mar 2022 10:26:16 +0800 Subject: [PATCH 4/5] ADD redis mode licence --- connectors/rocketmq-connect-redis/pom.xml | 27 +++++++++++++++++++ .../redis/test/common/ConfigTest.java | 17 ++++++++++++ .../redis/test/common/OptionsTest.java | 17 ++++++++++++ .../connector/RedisSourceConnectorTest.java | 17 ++++++++++++ .../test/handler/RedisEventHandlerTest.java | 17 ++++++++++++ .../redis/test/parser/ParserTest.java | 17 ++++++++++++ .../rocketmq/redis/test/pojo/GeoTest.java | 17 ++++++++++++ .../rocketmq/redis/test/pojo/KVEntryTest.java | 17 ++++++++++++ .../redis/test/pojo/RedisEventTest.java | 17 ++++++++++++ .../redis/test/processor/ListenerTest.java | 17 ++++++++++++ .../redis/test/processor/ProcessorTest.java | 17 ++++++++++++ .../redis/test/util/ParseStringUtilsTest.java | 17 ++++++++++++ .../test/util/PropertyToObjectUtilsTest.java | 17 ++++++++++++ 13 files changed, 231 insertions(+) diff --git a/connectors/rocketmq-connect-redis/pom.xml b/connectors/rocketmq-connect-redis/pom.xml index 62f79502..12fa014d 100644 --- a/connectors/rocketmq-connect-redis/pom.xml +++ b/connectors/rocketmq-connect-redis/pom.xml @@ -1,4 +1,20 @@ + @@ -107,6 +123,17 @@ ${project.build.sourceEncoding} + + org.apache.rat + apache-rat-plugin + 0.12 + + + README.md + README-CN.md + + + org.codehaus.mojo findbugs-maven-plugin diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/ConfigTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/ConfigTest.java index 8866e874..4e01164b 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/ConfigTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/ConfigTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.common; import java.nio.ByteBuffer; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/OptionsTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/OptionsTest.java index a7ac8ae5..deeebb96 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/OptionsTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/common/OptionsTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.common; import org.apache.rocketmq.connect.redis.common.Options; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/connector/RedisSourceConnectorTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/connector/RedisSourceConnectorTest.java index 355351bb..64fb1884 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/connector/RedisSourceConnectorTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/connector/RedisSourceConnectorTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.connector; import java.util.List; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/handler/RedisEventHandlerTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/handler/RedisEventHandlerTest.java index 2ddcb25c..1ea3aa07 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/handler/RedisEventHandlerTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/handler/RedisEventHandlerTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.handler; import com.moilioncircle.redis.replicator.cmd.impl.GetSetCommand; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/parser/ParserTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/parser/ParserTest.java index b2b79607..abd0d4ab 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/parser/ParserTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/parser/ParserTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.parser; import java.util.List; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/GeoTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/GeoTest.java index c905140c..73286ab4 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/GeoTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/GeoTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.pojo; import org.apache.rocketmq.connect.redis.pojo.Geo; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/KVEntryTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/KVEntryTest.java index 65d8bcf3..d3cd7849 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/KVEntryTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/KVEntryTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.pojo; import java.util.ArrayList; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/RedisEventTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/RedisEventTest.java index a412e6d6..2a585e2b 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/RedisEventTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/pojo/RedisEventTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.pojo; import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ListenerTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ListenerTest.java index ee9b8d61..6cbede36 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ListenerTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ListenerTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.processor; import com.moilioncircle.redis.replicator.CloseListener; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ProcessorTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ProcessorTest.java index 8472f261..f0383d06 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ProcessorTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/processor/ProcessorTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.processor; import java.io.IOException; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/util/ParseStringUtilsTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/util/ParseStringUtilsTest.java index 2a67959f..144f2f88 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/util/ParseStringUtilsTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/util/ParseStringUtilsTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.util; import java.util.List; diff --git a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/util/PropertyToObjectUtilsTest.java b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/util/PropertyToObjectUtilsTest.java index ddcc59b9..d2ab8295 100644 --- a/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/util/PropertyToObjectUtilsTest.java +++ b/connectors/rocketmq-connect-redis/src/test/java/org/apache/rocketmq/redis/test/util/PropertyToObjectUtilsTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.redis.test.util; import java.lang.reflect.InvocationTargetException; From f79cd4c29d794ca5c76a16d24cbec56d979b4f42 Mon Sep 17 00:00:00 2001 From: doubleDimple Date: Thu, 7 Apr 2022 18:20:18 +0800 Subject: [PATCH 5/5] RedisSinkTask commit --- .../redis/connector/RedisSinkTask.java | 79 ++++++++++++++++--- .../connect/redis/sink/RedisUpdater.java | 15 ++++ 2 files changed, 85 insertions(+), 9 deletions(-) create mode 100644 connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/sink/RedisUpdater.java diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java index d9a87505..e9460a1c 100644 --- a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSinkTask.java @@ -20,12 +20,22 @@ import com.alibaba.fastjson.JSONObject; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.common.QueueMetaData; +import io.openmessaging.connector.api.data.EntryType; import io.openmessaging.connector.api.data.Field; import io.openmessaging.connector.api.data.Schema; import io.openmessaging.connector.api.data.SinkDataEntry; import io.openmessaging.connector.api.sink.SinkTask; +import org.apache.rocketmq.connect.redis.config.Config; +import org.apache.rocketmq.connect.redis.converter.KVEntryConverter; +import org.apache.rocketmq.connect.redis.converter.RedisEntryConverter; +import org.apache.rocketmq.connect.redis.handler.DefaultRedisEventHandler; +import org.apache.rocketmq.connect.redis.handler.RedisEventHandler; +import org.apache.rocketmq.connect.redis.processor.DefaultRedisEventProcessor; +import org.apache.rocketmq.connect.redis.processor.RedisEventProcessor; +import org.apache.rocketmq.connect.redis.sink.RedisUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -38,15 +48,40 @@ public class RedisSinkTask extends SinkTask { private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkTask.class); + private RedisUpdater updater; + + /** + * listening and handle Redis event. + */ + private RedisEventProcessor eventProcessor; + private Config config; + /** + * convert kVEntry to list of sourceDataEntry + */ + private KVEntryConverter kvEntryConverter; + + public RedisEventProcessor getEventProcessor() { + return eventProcessor; + } + + public void setEventProcessor(RedisEventProcessor eventProcessor) { + this.eventProcessor = eventProcessor; + } + + public Config getConfig() { + return config; + } + @Override public void put(Collection sinkDataEntries) { //save data from MQ to redis - for (SinkDataEntry record : sinkDataEntries) { + for (SinkDataEntry sinkDataEntry : sinkDataEntries) { Map fieldMap = new HashMap<>(); - Object[] payloads = record.getPayload(); - Schema schema = record.getSchema(); - //判断schema的datasource,是不是redis,如果是redis,按照redis命令存储, - //如果不是redis,则直接将数据存储到redis的集合中 + Object[] payloads = sinkDataEntry.getPayload(); + + Schema schema = sinkDataEntry.getSchema(); + EntryType entryType = sinkDataEntry.getEntryType(); + List fields = schema.getFields(); Boolean parseError = false; if (!fields.isEmpty()) { @@ -62,7 +97,10 @@ public void put(Collection sinkDataEntries) { } } if (!parseError) { - + Boolean isSuccess = updater.push(fieldMap, entryType); + if (!isSuccess) { + LOGGER.error("push data error, entryType:{}, fieldMap:{}", fieldMap, entryType); + } } } } @@ -73,13 +111,36 @@ public void commit(Map offsets) { } @Override - public void start(KeyValue config) { - + public void start(KeyValue keyValue) { + this.kvEntryConverter = new RedisEntryConverter(); + + this.config = new Config(); + this.config.load(keyValue); + LOGGER.info("task config msg: {}", this.config.toString()); + + this.eventProcessor = new DefaultRedisEventProcessor(config); + RedisEventHandler eventHandler = new DefaultRedisEventHandler(this.config); + this.eventProcessor.registEventHandler(eventHandler); + try { + this.eventProcessor.start(); + LOGGER.info("Redis task start."); + } catch (IOException e) { + e.printStackTrace(); + LOGGER.error("processor start error: [{}]", e.getMessage()); + this.stop(); + } } @Override public void stop() { - + if (this.eventProcessor != null) { + try { + this.eventProcessor.stop(); + LOGGER.info("Redis task is stopped."); + } catch (IOException e) { + LOGGER.error("processor stop error: {}", e); + } + } } @Override diff --git a/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/sink/RedisUpdater.java b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/sink/RedisUpdater.java new file mode 100644 index 00000000..e55caefe --- /dev/null +++ b/connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/sink/RedisUpdater.java @@ -0,0 +1,15 @@ +package org.apache.rocketmq.connect.redis.sink; + +import io.openmessaging.connector.api.data.EntryType; +import io.openmessaging.connector.api.data.Field; + +import java.util.Map; + +public class RedisUpdater { + + + public Boolean push(Map fieldMap, EntryType entryType) { + + return null; + } +}