From bc7336176dcd0f4ee1e43da140b38a34876946f4 Mon Sep 17 00:00:00 2001 From: chestnufang <65438734+chestnut-c@users.noreply.github.com> Date: Tue, 2 Jan 2024 09:59:13 +0800 Subject: [PATCH] [INLONG-9508][Manager] Add Iceberg field type mapping strategy to improve usability (#9545) Co-authored-by: chestnufang --- .../strategy/IcebergFieldTypeStrategy.java | 43 +++++++++++++ .../resources/iceberg-field-type-mapping.yaml | 63 +++++++++++++++++++ .../sort/node/provider/IcebergProvider.java | 10 ++- .../pojo/sort/util/FieldInfoUtilsTest.java | 14 +++++ 4 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/IcebergFieldTypeStrategy.java create mode 100644 inlong-manager/manager-common/src/main/resources/iceberg-field-type-mapping.yaml diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/IcebergFieldTypeStrategy.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/IcebergFieldTypeStrategy.java new file mode 100644 index 00000000000..2555fc3e5e2 --- /dev/null +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/IcebergFieldTypeStrategy.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.common.fieldtype.strategy; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.fieldtype.FieldTypeMappingReader; + +import org.apache.commons.lang3.StringUtils; + +import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET; + +/** + * The iceberg field type mapping strategy + */ +public class IcebergFieldTypeStrategy implements FieldTypeMappingStrategy { + + private final FieldTypeMappingReader reader; + + public IcebergFieldTypeStrategy() { + this.reader = new FieldTypeMappingReader(DataNodeType.ICEBERG); + } + + @Override + public String getFieldTypeMapping(String sourceType) { + String dataType = StringUtils.substringBefore(sourceType, LEFT_BRACKET).toUpperCase(); + return reader.getFIELD_TYPE_MAPPING_MAP().getOrDefault(dataType, sourceType.toUpperCase()); + } +} diff --git a/inlong-manager/manager-common/src/main/resources/iceberg-field-type-mapping.yaml b/inlong-manager/manager-common/src/main/resources/iceberg-field-type-mapping.yaml new file mode 100644 index 00000000000..29508f9e56d --- /dev/null +++ b/inlong-manager/manager-common/src/main/resources/iceberg-field-type-mapping.yaml @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source.type.to.target.type.converter: + + - source.type: INTEGER + target.type: INT + + - source.type: FLOAT + target.type: FLOAT + + - source.type: DOUBLE + target.type: DOUBLE + + - source.type: LONG + target.type: LONG + + - source.type: DECIMAL + target.type: DECIMAL + + - source.type: BOOLEAN + target.type: BOOLEAN + + - source.type: DATE + target.type: DATE + + - source.type: TIME + target.type: TIME + + - source.type: TIMESTAMP + target.type: LOCAL_ZONE_TIMESTAMP + + - source.type: TIMESTAMP WITH TIMEZONE + target.type: LOCAL_ZONE_TIMESTAMP + + - source.type: TIMESTAMP WITHOUT TIMEZONE + target.type: TIMESTAMP + + - source.type: UUID + target.type: STRING + + - source.type: STRING + target.type: STRING + + - source.type: FIXED + target.type: VARBINARY + + - source.type: BINARY + target.type: BINARY \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java index 37302bc3968..669f7c22171 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java @@ -19,6 +19,8 @@ import org.apache.inlong.common.enums.MetaField; import org.apache.inlong.manager.common.consts.StreamType; +import org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy; +import org.apache.inlong.manager.common.fieldtype.strategy.IcebergFieldTypeStrategy; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink; import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider; @@ -51,6 +53,8 @@ @Slf4j public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider { + private static final FieldTypeMappingStrategy FIELD_TYPE_MAPPING_STRATEGY = new IcebergFieldTypeStrategy(); + @Override public Boolean accept(String sinkType) { return StreamType.ICEBERG.equals(sinkType); @@ -59,7 +63,8 @@ public Boolean accept(String sinkType) { @Override public ExtractNode createExtractNode(StreamNode streamNodeInfo) { IcebergSource icebergSource = (IcebergSource) streamNodeInfo; - List fieldInfos = parseStreamFieldInfos(icebergSource.getFieldList(), icebergSource.getSourceName()); + List fieldInfos = parseStreamFieldInfos(icebergSource.getFieldList(), icebergSource.getSourceName(), + FIELD_TYPE_MAPPING_STRATEGY); Map properties = parseProperties(icebergSource.getProperties()); return new IcebergExtractNode(icebergSource.getSourceName(), @@ -81,7 +86,8 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) { public LoadNode createLoadNode(StreamNode nodeInfo, Map constantFieldMap) { IcebergSink icebergSink = (IcebergSink) nodeInfo; Map properties = parseProperties(icebergSink.getProperties()); - List fieldInfos = parseSinkFieldInfos(icebergSink.getSinkFieldList(), icebergSink.getSinkName()); + List fieldInfos = parseSinkFieldInfos(icebergSink.getSinkFieldList(), icebergSink.getSinkName(), + FIELD_TYPE_MAPPING_STRATEGY); List fieldRelations = parseSinkFields(icebergSink.getSinkFieldList(), constantFieldMap); IcebergConstant.CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType()); Format format = parsingSinkMultipleFormat(icebergSink.getSinkMultipleEnable(), diff --git a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtilsTest.java b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtilsTest.java index f1394c9b738..92fd73ace26 100644 --- a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtilsTest.java +++ b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtilsTest.java @@ -19,6 +19,7 @@ import org.apache.inlong.manager.common.enums.FieldType; import org.apache.inlong.manager.common.fieldtype.strategy.ClickHouseFieldTypeStrategy; +import org.apache.inlong.manager.common.fieldtype.strategy.IcebergFieldTypeStrategy; import org.apache.inlong.manager.common.fieldtype.strategy.MongoDBFieldTypeStrategy; import org.apache.inlong.manager.common.fieldtype.strategy.MySQLFieldTypeStrategy; import org.apache.inlong.manager.common.fieldtype.strategy.OracleFieldTypeStrategy; @@ -124,4 +125,17 @@ public void testClickHouseFieldTypeInfo() { TypeInfo typeInfo = fieldInfo.getFormatInfo().getTypeInfo(); Assertions.assertTrue(typeInfo instanceof ByteTypeInfo); } + + @Test + public void testIcebergFieldTypeInfo() { + StreamField streamField = new StreamField(); + streamField.setIsMetaField(0); + streamField.setFieldName("time"); + streamField.setFieldType("TIMESTAMP WITH TIMEZONE"); + streamField.setFieldValue("2022-03-01T09:00:00 America/New_York"); + FieldInfo fieldInfo = FieldInfoUtils.parseStreamFieldInfo(streamField, + "nodeId", new IcebergFieldTypeStrategy()); + TypeInfo typeInfo = fieldInfo.getFormatInfo().getTypeInfo(); + Assertions.assertTrue(typeInfo instanceof LocalZonedTimestampTypeInfo); + } }