From 0791d2b868b055c5a3aa7b6ebc27812729413fa5 Mon Sep 17 00:00:00 2001
From: fuweng11 <76141879+fuweng11@users.noreply.github.com>
Date: Fri, 22 Dec 2023 16:59:53 +0800
Subject: [PATCH] [INLONG-9510][Manager] Supports doris database
synchronization (#9511)
---
.../manager/common/consts/DataNodeType.java | 2 +
.../pojo/node/doris/DorisDataNodeDTO.java | 68 +++++++++++++++
.../pojo/node/doris/DorisDataNodeInfo.java | 49 +++++++++++
.../pojo/node/doris/DorisDataNodeRequest.java | 39 +++++++++
.../sort/node/provider/DorisProvider.java | 36 ++++++++
.../node/doris/DorisDataNodeOperator.java | 85 +++++++++++++++++++
.../service/sink/doris/DorisSinkOperator.java | 16 +++-
7 files changed, 292 insertions(+), 3 deletions(-)
create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeDTO.java
create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeInfo.java
create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeRequest.java
create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/doris/DorisDataNodeOperator.java
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index e14a98195fc..fda7ee72624 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -36,6 +36,8 @@ public class DataNodeType {
public static final String ORACLE = "ORACLE";
public static final String SQLSERVER = "SQLSERVER";
public static final String MONGODB = "MONGODB";
+ public static final String DORIS = "DORIS";
+
/**
* Tencent cloud log service
* Details: CLS
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeDTO.java
new file mode 100644
index 00000000000..fa580444a2b
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeDTO.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.doris;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Doris data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@ApiModel("Doris data node info")
+public class DorisDataNodeDTO {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DorisDataNodeDTO.class);
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static DorisDataNodeDTO getFromRequest(DorisDataNodeRequest request, String extParams) throws Exception {
+ DorisDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+ ? DorisDataNodeDTO.getFromJson(extParams)
+ : new DorisDataNodeDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static DorisDataNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, DorisDataNodeDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
+ String.format("Failed to parse extParams for Doris node: %s", e.getMessage()));
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeInfo.java
new file mode 100644
index 00000000000..99faafe8824
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeInfo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.doris;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Doris data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.DORIS)
+@ApiModel("Doris data node info")
+public class DorisDataNodeInfo extends DataNodeInfo {
+
+ public DorisDataNodeInfo() {
+ this.setType(DataNodeType.DORIS);
+ }
+
+ @Override
+ public DorisDataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, DorisDataNodeRequest::new);
+ }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeRequest.java
new file mode 100644
index 00000000000..3416ec2e34a
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/doris/DorisDataNodeRequest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.doris;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Doris data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.DORIS)
+@ApiModel("Doris data node request")
+public class DorisDataNodeRequest extends DataNodeRequest {
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
index e4e71333c93..a87db386900 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
@@ -17,7 +17,9 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -28,12 +30,17 @@
import org.apache.inlong.sort.protocol.node.load.DorisLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* The Provider for creating Doris load nodes.
*/
+@Slf4j
public class DorisProvider implements LoadNodeProvider {
@Override
@@ -48,6 +55,7 @@ public LoadNode createLoadNode(StreamNode nodeInfo, Map con
List fieldInfos = parseSinkFieldInfos(dorisSink.getSinkFieldList(), dorisSink.getSinkName());
List fieldRelations = parseSinkFields(dorisSink.getSinkFieldList(), constantFieldMap);
Format format = parsingSinkMultipleFormat(dorisSink.getSinkMultipleEnable(), dorisSink.getSinkMultipleFormat());
+ log.info("Test sink doris pro username ={}", dorisSink);
return new DorisLoadNode(
dorisSink.getSinkName(),
@@ -68,4 +76,32 @@ public LoadNode createLoadNode(StreamNode nodeInfo, Map con
dorisSink.getDatabasePattern(),
dorisSink.getTablePattern());
}
+
+ @Override
+ public Boolean isSinkMultiple(StreamNode nodeInfo) {
+ DorisSink dorisSink = (DorisSink) nodeInfo;
+ return dorisSink.getSinkMultipleEnable();
+ }
+
+ @Override
+ public List addStreamFieldsForSinkMultiple(List streamFields) {
+ if (CollectionUtils.isEmpty(streamFields)) {
+ streamFields = new ArrayList<>();
+ }
+ streamFields.add(0,
+ new StreamField(0, "varbinary", MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", null, 1,
+ MetaField.DATA_BYTES_CANAL.name()));
+ return streamFields;
+ }
+
+ @Override
+ public List addSinkFieldsForSinkMultiple(List sinkFields) {
+ if (CollectionUtils.isEmpty(sinkFields)) {
+ sinkFields = new ArrayList<>();
+ }
+ sinkFields.add(0, new SinkField(0, "varbinary", MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal",
+ MetaField.DATA_BYTES_CANAL.name(), "varbinary", 0, MetaField.DATA_BYTES_CANAL.name(), null));
+ return sinkFields;
+ }
+
}
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/doris/DorisDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/doris/DorisDataNodeOperator.java
new file mode 100644
index 00000000000..425f9f65924
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/doris/DorisDataNodeOperator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.doris;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.doris.DorisDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.doris.DorisDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.doris.DorisDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DorisDataNodeOperator extends AbstractDataNodeOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DorisDataNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ LOGGER.info("test data type {}, actual type={}", dataNodeType, getDataNodeType());
+ return getDataNodeType().equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.DORIS;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+ DorisDataNodeInfo dorisDataNodeInfo = new DorisDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, dorisDataNodeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ DorisDataNodeDTO dto = DorisDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, dorisDataNodeInfo);
+ }
+ return dorisDataNodeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) {
+ DorisDataNodeRequest nodeRequest = (DorisDataNodeRequest) request;
+ CommonBeanUtils.copyProperties(nodeRequest, targetEntity, true);
+ try {
+ DorisDataNodeDTO dto = DorisDataNodeDTO.getFromRequest(nodeRequest, targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("Failed to build extParams for Doris node: %s", e.getMessage()));
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
index 0ed068ac702..3c7d4c58eaa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
@@ -22,6 +22,7 @@
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.node.doris.DorisDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -87,12 +88,21 @@ public StreamSink getFromEntity(@NotNull StreamSinkEntity entity) {
DorisSinkDTO dto = DorisSinkDTO.getFromJson(entity.getExtParams());
if (StringUtils.isBlank(dto.getFeNodes())) {
- throw new BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
- "doris fe nodes is blank");
+ if (StringUtils.isBlank(entity.getDataNodeName())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "doris fe nodes unspecified and data node is blank");
+ }
+ DorisDataNodeInfo dataNodeInfo = (DorisDataNodeInfo) dataNodeHelper.getDataNodeInfo(
+ entity.getDataNodeName(), entity.getSinkType());
+ dto.setFeNodes(dataNodeInfo.getUrl());
+ dto.setUsername(dataNodeInfo.getUsername());
+ dto.setPassword(dataNodeInfo.getToken());
+ CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
}
+
CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
- List sinkFields = super.getSinkFields(entity.getId());
+ List sinkFields = getSinkFields(entity.getId());
sink.setSinkFieldList(sinkFields);
return sink;
}