diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java index e2a32b4f3f0a..6f442e767244 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; public class DatabaseIdentifier { + public static final String GENERIC = "Generic"; public static final String DB_2 = "DB2"; public static final String DAMENG = "Dameng"; public static final String GBASE_8A = "Gbase8a"; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialect.java new file mode 100644 index 000000000000..36c93d654694 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialect.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; + +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Optional; + +@Slf4j +public class GenericDialect implements JdbcDialect { + + public String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); + + public GenericDialect() {} + + public GenericDialect(String fieldIde) { + this.fieldIde = fieldIde; + } + + @Override + public String dialectName() { + return DatabaseIdentifier.GENERIC; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new AbstractJdbcRowConverter() { + @Override + public String converterName() { + return DatabaseIdentifier.GENERIC; + } + }; + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new GenericTypeMapper(); + } + + @Override + public String quoteIdentifier(String identifier) { + return getFieldIde(identifier, fieldIde); + } + + @Override + public String quoteDatabaseIdentifier(String identifier) { + return identifier; + } + + @Override + public String tableIdentifier(TablePath tablePath) { + return tableIdentifier(tablePath.getDatabaseName(), tablePath.getTableName()); + } + + @Override + public Optional getUpsertStatement( + String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) { + throw new UnsupportedOperationException(); + } + + @Override + public TablePath parse(String tablePath) { + return TablePath.of(tablePath, false); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java new file mode 100644 index 000000000000..67a7360d01e9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; + +import com.google.auto.service.AutoService; + +import javax.annotation.Nonnull; + +/** Factory for {@link GenericDialect}. */ +@AutoService(JdbcDialectFactory.class) +public class GenericDialectFactory implements JdbcDialectFactory { + @Override + public boolean acceptsURL(String url) { + return true; + } + + @Override + public JdbcDialect create() { + return new GenericDialect(); + } + + @Override + public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { + return new GenericDialect(fieldIde); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeConverter.java new file mode 100644 index 000000000000..8c4555697a4c --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeConverter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.converter.TypeConverter; +import org.apache.seatunnel.common.exception.CommonError; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@AutoService(TypeConverter.class) +public class GenericTypeConverter implements TypeConverter { + + public static final GenericTypeConverter DEFAULT_INSTANCE = new GenericTypeConverter(); + + @Override + public String identifier() { + return DatabaseIdentifier.GENERIC; + } + + /** + * Convert an external system's type definition to {@link Column}. + * + * @param typeDefine type define + * @return column + */ + @Override + public Column convert(BasicTypeDefine typeDefine) { + PhysicalColumn.PhysicalColumnBuilder builder = + PhysicalColumn.builder() + .name(typeDefine.getName()) + .nullable(typeDefine.isNullable()) + .defaultValue(typeDefine.getDefaultValue()) + .comment(typeDefine.getComment()); + String dataType = typeDefine.getDataType().toUpperCase(); + switch (dataType) { + case "BOOLEAN": + } + + return null; + } + + /** + * Convert {@link Column} to an external system's type definition. + * + * @param column + * @return + */ + @Override + public BasicTypeDefine reconvert(Column column) { + BasicTypeDefine.BasicTypeDefineBuilder builder = + BasicTypeDefine.builder() + .name(column.getName()) + .nullable(column.isNullable()) + .comment(column.getComment()) + .defaultValue(column.getDefaultValue()); + switch (column.getDataType().getSqlType()) { + case BOOLEAN: + break; + case TINYINT: + break; + case SMALLINT: + break; + case INT: + break; + case BIGINT: + break; + case FLOAT: + break; + case DOUBLE: + break; + case DECIMAL: + break; + case STRING: + break; + case BYTES: + break; + case DATE: + break; + case TIME: + break; + case TIMESTAMP: + break; + default: + throw CommonError.convertToConnectorTypeError( + DatabaseIdentifier.GENERIC, + column.getDataType().getSqlType().name(), + column.getName()); + } + + return null; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeMapper.java new file mode 100644 index 000000000000..bf33c40527c0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeMapper.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; + +public class GenericTypeMapper implements JdbcDialectTypeMapper { + + private GenericTypeConverter typeConverter; + + public GenericTypeMapper() { + this(GenericTypeConverter.DEFAULT_INSTANCE); + } + + public GenericTypeMapper(GenericTypeConverter typeConverter) { + this.typeConverter = typeConverter; + } + + @Override + public Column mappingColumn(BasicTypeDefine typeDefine) { + return typeConverter.convert(typeDefine); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java index 350a22e20c6c..7dc71835aec3 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java @@ -61,24 +61,17 @@ public static JdbcDialect load(String url, String compatibleMode, String fieldId JdbcDialectFactory.class.getName())); } - final List matchingFactories = + List matchingFactories = foundFactories.stream().filter(f -> f.acceptsURL(url)).collect(Collectors.toList()); - if (matchingFactories.isEmpty()) { - throw new JdbcConnectorException( - JdbcConnectorErrorCode.NO_SUITABLE_DIALECT_FACTORY, - String.format( - "Could not find any jdbc dialect factory that can handle url '%s' that implements '%s' in the classpath.\n\n" - + "Available factories are:\n\n" - + "%s", - url, - JdbcDialectFactory.class.getName(), - foundFactories.stream() - .map(f -> f.getClass().getName()) - .distinct() - .sorted() - .collect(Collectors.joining("\n")))); + // filter out generic dialect factory + if (matchingFactories.size() > 1) { + matchingFactories = + matchingFactories.stream() + .filter(f -> !(f instanceof GenericDialectFactory)) + .collect(Collectors.toList()); } + if (matchingFactories.size() > 1) { throw new JdbcConnectorException( JdbcConnectorErrorCode.NO_SUITABLE_DIALECT_FACTORY,