Skip to content

Commit

Permalink
[INLONG-9510][Manager] Supports doris database synchronization (#9511)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 authored Dec 22, 2023
1 parent 1e25c54 commit 0791d2b
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class DataNodeType {
public static final String ORACLE = "ORACLE";
public static final String SQLSERVER = "SQLSERVER";
public static final String MONGODB = "MONGODB";
public static final String DORIS = "DORIS";

/**
* Tencent cloud log service
* Details: <a href="https://www.tencentcloud.com/products/cls">CLS</a>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.doris;

import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;

import io.swagger.annotations.ApiModel;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.validation.constraints.NotNull;

/**
* Doris data node info
*/
@Data
@Builder
@NoArgsConstructor
@ApiModel("Doris data node info")
public class DorisDataNodeDTO {

private static final Logger LOGGER = LoggerFactory.getLogger(DorisDataNodeDTO.class);

/**
* Get the dto instance from the request
*/
public static DorisDataNodeDTO getFromRequest(DorisDataNodeRequest request, String extParams) throws Exception {
DorisDataNodeDTO dto = StringUtils.isNotBlank(extParams)
? DorisDataNodeDTO.getFromJson(extParams)
: new DorisDataNodeDTO();
return CommonBeanUtils.copyProperties(request, dto, true);
}

/**
* Get the dto instance from the JSON string.
*/
public static DorisDataNodeDTO getFromJson(@NotNull String extParams) {
try {
return JsonUtils.parseObject(extParams, DorisDataNodeDTO.class);
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
String.format("Failed to parse extParams for Doris node: %s", e.getMessage()));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.doris;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Doris data node info
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = DataNodeType.DORIS)
@ApiModel("Doris data node info")
public class DorisDataNodeInfo extends DataNodeInfo {

public DorisDataNodeInfo() {
this.setType(DataNodeType.DORIS);
}

@Override
public DorisDataNodeRequest genRequest() {
return CommonBeanUtils.copyProperties(this, DorisDataNodeRequest::new);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.doris;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Doris data node request
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = DataNodeType.DORIS)
@ApiModel("Doris data node request")
public class DorisDataNodeRequest extends DataNodeRequest {

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.inlong.manager.pojo.sort.node.provider;

import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
import org.apache.inlong.manager.pojo.stream.StreamField;
Expand All @@ -28,12 +30,17 @@
import org.apache.inlong.sort.protocol.node.load.DorisLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* The Provider for creating Doris load nodes.
*/
@Slf4j
public class DorisProvider implements LoadNodeProvider {

@Override
Expand All @@ -48,6 +55,7 @@ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> con
List<FieldInfo> fieldInfos = parseSinkFieldInfos(dorisSink.getSinkFieldList(), dorisSink.getSinkName());
List<FieldRelation> fieldRelations = parseSinkFields(dorisSink.getSinkFieldList(), constantFieldMap);
Format format = parsingSinkMultipleFormat(dorisSink.getSinkMultipleEnable(), dorisSink.getSinkMultipleFormat());
log.info("Test sink doris pro username ={}", dorisSink);

return new DorisLoadNode(
dorisSink.getSinkName(),
Expand All @@ -68,4 +76,32 @@ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> con
dorisSink.getDatabasePattern(),
dorisSink.getTablePattern());
}

@Override
public Boolean isSinkMultiple(StreamNode nodeInfo) {
DorisSink dorisSink = (DorisSink) nodeInfo;
return dorisSink.getSinkMultipleEnable();
}

@Override
public List<StreamField> addStreamFieldsForSinkMultiple(List<StreamField> streamFields) {
if (CollectionUtils.isEmpty(streamFields)) {
streamFields = new ArrayList<>();
}
streamFields.add(0,
new StreamField(0, "varbinary", MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", null, 1,
MetaField.DATA_BYTES_CANAL.name()));
return streamFields;
}

@Override
public List<SinkField> addSinkFieldsForSinkMultiple(List<SinkField> sinkFields) {
if (CollectionUtils.isEmpty(sinkFields)) {
sinkFields = new ArrayList<>();
}
sinkFields.add(0, new SinkField(0, "varbinary", MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal",
MetaField.DATA_BYTES_CANAL.name(), "varbinary", 0, MetaField.DATA_BYTES_CANAL.name(), null));
return sinkFields;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.service.node.doris;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
import org.apache.inlong.manager.pojo.node.doris.DorisDataNodeDTO;
import org.apache.inlong.manager.pojo.node.doris.DorisDataNodeInfo;
import org.apache.inlong.manager.pojo.node.doris.DorisDataNodeRequest;
import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DorisDataNodeOperator extends AbstractDataNodeOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(DorisDataNodeOperator.class);

@Autowired
private ObjectMapper objectMapper;

@Override
public Boolean accept(String dataNodeType) {
LOGGER.info("test data type {}, actual type={}", dataNodeType, getDataNodeType());
return getDataNodeType().equals(dataNodeType);
}

@Override
public String getDataNodeType() {
return DataNodeType.DORIS;
}

@Override
public DataNodeInfo getFromEntity(DataNodeEntity entity) {
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
}
DorisDataNodeInfo dorisDataNodeInfo = new DorisDataNodeInfo();
CommonBeanUtils.copyProperties(entity, dorisDataNodeInfo);
if (StringUtils.isNotBlank(entity.getExtParams())) {
DorisDataNodeDTO dto = DorisDataNodeDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(dto, dorisDataNodeInfo);
}
return dorisDataNodeInfo;
}

@Override
protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) {
DorisDataNodeRequest nodeRequest = (DorisDataNodeRequest) request;
CommonBeanUtils.copyProperties(nodeRequest, targetEntity, true);
try {
DorisDataNodeDTO dto = DorisDataNodeDTO.getFromRequest(nodeRequest, targetEntity.getExtParams());
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
String.format("Failed to build extParams for Doris node: %s", e.getMessage()));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.node.doris.DorisDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
Expand Down Expand Up @@ -87,12 +88,21 @@ public StreamSink getFromEntity(@NotNull StreamSinkEntity entity) {

DorisSinkDTO dto = DorisSinkDTO.getFromJson(entity.getExtParams());
if (StringUtils.isBlank(dto.getFeNodes())) {
throw new BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
"doris fe nodes is blank");
if (StringUtils.isBlank(entity.getDataNodeName())) {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
"doris fe nodes unspecified and data node is blank");
}
DorisDataNodeInfo dataNodeInfo = (DorisDataNodeInfo) dataNodeHelper.getDataNodeInfo(
entity.getDataNodeName(), entity.getSinkType());
dto.setFeNodes(dataNodeInfo.getUrl());
dto.setUsername(dataNodeInfo.getUsername());
dto.setPassword(dataNodeInfo.getToken());
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
}

CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
List<SinkField> sinkFields = super.getSinkFields(entity.getId());
List<SinkField> sinkFields = getSinkFields(entity.getId());
sink.setSinkFieldList(sinkFields);
return sink;
}
Expand Down

0 comments on commit 0791d2b

Please sign in to comment.