Skip to content

Commit

Permalink
[INLONG-9508][Manager] Add Iceberg field type mapping strategy to imp…
Browse files Browse the repository at this point in the history
…rove usability (#9545)

Co-authored-by: chestnufang <[email protected]>
  • Loading branch information
chestnut-c and chestnufang authored Jan 2, 2024
1 parent 869aeb0 commit bc73361
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.common.fieldtype.strategy;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.fieldtype.FieldTypeMappingReader;

import org.apache.commons.lang3.StringUtils;

import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;

/**
* The iceberg field type mapping strategy
*/
public class IcebergFieldTypeStrategy implements FieldTypeMappingStrategy {

private final FieldTypeMappingReader reader;

public IcebergFieldTypeStrategy() {
this.reader = new FieldTypeMappingReader(DataNodeType.ICEBERG);
}

@Override
public String getFieldTypeMapping(String sourceType) {
String dataType = StringUtils.substringBefore(sourceType, LEFT_BRACKET).toUpperCase();
return reader.getFIELD_TYPE_MAPPING_MAP().getOrDefault(dataType, sourceType.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

source.type.to.target.type.converter:

- source.type: INTEGER
target.type: INT

- source.type: FLOAT
target.type: FLOAT

- source.type: DOUBLE
target.type: DOUBLE

- source.type: LONG
target.type: LONG

- source.type: DECIMAL
target.type: DECIMAL

- source.type: BOOLEAN
target.type: BOOLEAN

- source.type: DATE
target.type: DATE

- source.type: TIME
target.type: TIME

- source.type: TIMESTAMP
target.type: LOCAL_ZONE_TIMESTAMP

- source.type: TIMESTAMP WITH TIMEZONE
target.type: LOCAL_ZONE_TIMESTAMP

- source.type: TIMESTAMP WITHOUT TIMEZONE
target.type: TIMESTAMP

- source.type: UUID
target.type: STRING

- source.type: STRING
target.type: STRING

- source.type: FIXED
target.type: VARBINARY

- source.type: BINARY
target.type: BINARY
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.consts.StreamType;
import org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
import org.apache.inlong.manager.common.fieldtype.strategy.IcebergFieldTypeStrategy;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
Expand Down Expand Up @@ -51,6 +53,8 @@
@Slf4j
public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider {

private static final FieldTypeMappingStrategy FIELD_TYPE_MAPPING_STRATEGY = new IcebergFieldTypeStrategy();

@Override
public Boolean accept(String sinkType) {
return StreamType.ICEBERG.equals(sinkType);
Expand All @@ -59,7 +63,8 @@ public Boolean accept(String sinkType) {
@Override
public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
IcebergSource icebergSource = (IcebergSource) streamNodeInfo;
List<FieldInfo> fieldInfos = parseStreamFieldInfos(icebergSource.getFieldList(), icebergSource.getSourceName());
List<FieldInfo> fieldInfos = parseStreamFieldInfos(icebergSource.getFieldList(), icebergSource.getSourceName(),
FIELD_TYPE_MAPPING_STRATEGY);
Map<String, String> properties = parseProperties(icebergSource.getProperties());

return new IcebergExtractNode(icebergSource.getSourceName(),
Expand All @@ -81,7 +86,8 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
IcebergSink icebergSink = (IcebergSink) nodeInfo;
Map<String, String> properties = parseProperties(icebergSink.getProperties());
List<FieldInfo> fieldInfos = parseSinkFieldInfos(icebergSink.getSinkFieldList(), icebergSink.getSinkName());
List<FieldInfo> fieldInfos = parseSinkFieldInfos(icebergSink.getSinkFieldList(), icebergSink.getSinkName(),
FIELD_TYPE_MAPPING_STRATEGY);
List<FieldRelation> fieldRelations = parseSinkFields(icebergSink.getSinkFieldList(), constantFieldMap);
IcebergConstant.CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType());
Format format = parsingSinkMultipleFormat(icebergSink.getSinkMultipleEnable(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.fieldtype.strategy.ClickHouseFieldTypeStrategy;
import org.apache.inlong.manager.common.fieldtype.strategy.IcebergFieldTypeStrategy;
import org.apache.inlong.manager.common.fieldtype.strategy.MongoDBFieldTypeStrategy;
import org.apache.inlong.manager.common.fieldtype.strategy.MySQLFieldTypeStrategy;
import org.apache.inlong.manager.common.fieldtype.strategy.OracleFieldTypeStrategy;
Expand Down Expand Up @@ -124,4 +125,17 @@ public void testClickHouseFieldTypeInfo() {
TypeInfo typeInfo = fieldInfo.getFormatInfo().getTypeInfo();
Assertions.assertTrue(typeInfo instanceof ByteTypeInfo);
}

@Test
public void testIcebergFieldTypeInfo() {
StreamField streamField = new StreamField();
streamField.setIsMetaField(0);
streamField.setFieldName("time");
streamField.setFieldType("TIMESTAMP WITH TIMEZONE");
streamField.setFieldValue("2022-03-01T09:00:00 America/New_York");
FieldInfo fieldInfo = FieldInfoUtils.parseStreamFieldInfo(streamField,
"nodeId", new IcebergFieldTypeStrategy());
TypeInfo typeInfo = fieldInfo.getFormatInfo().getTypeInfo();
Assertions.assertTrue(typeInfo instanceof LocalZonedTimestampTypeInfo);
}
}

0 comments on commit bc73361

Please sign in to comment.