Skip to content

Commit

Permalink
add generic dialect apache#7956
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Nov 21, 2024
1 parent 69cd4ae commit b7fc979
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

public class DatabaseIdentifier {
public static final String GENERIC = "Generic";
public static final String DB_2 = "DB2";
public static final String DAMENG = "Dameng";
public static final String GBASE_8A = "Gbase8a";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;

import lombok.extern.slf4j.Slf4j;

import java.util.Optional;

@Slf4j
public class GenericDialect implements JdbcDialect {

public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();

public GenericDialect() {}

public GenericDialect(String fieldIde) {
this.fieldIde = fieldIde;
}

@Override
public String dialectName() {
return DatabaseIdentifier.GENERIC;
}

@Override
public JdbcRowConverter getRowConverter() {
return new AbstractJdbcRowConverter() {
@Override
public String converterName() {
return DatabaseIdentifier.GENERIC;
}
};
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new GenericTypeMapper();
}

@Override
public String quoteIdentifier(String identifier) {
return getFieldIde(identifier, fieldIde);
}

@Override
public String quoteDatabaseIdentifier(String identifier) {
return identifier;
}

@Override
public String tableIdentifier(TablePath tablePath) {
return tableIdentifier(tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {
throw new UnsupportedOperationException();
}

@Override
public TablePath parse(String tablePath) {
return TablePath.of(tablePath, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

import com.google.auto.service.AutoService;

import javax.annotation.Nonnull;

/** Factory for {@link GenericDialect}. */
@AutoService(JdbcDialectFactory.class)
public class GenericDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return true;
}

@Override
public JdbcDialect create() {
return new GenericDialect();
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
return new GenericDialect(fieldIde);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.common.exception.CommonError;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@AutoService(TypeConverter.class)
public class GenericTypeConverter implements TypeConverter<BasicTypeDefine> {

public static final GenericTypeConverter DEFAULT_INSTANCE = new GenericTypeConverter();

@Override
public String identifier() {
return DatabaseIdentifier.GENERIC;
}

/**
* Convert an external system's type definition to {@link Column}.
*
* @param typeDefine type define
* @return column
*/
@Override
public Column convert(BasicTypeDefine typeDefine) {
PhysicalColumn.PhysicalColumnBuilder builder =
PhysicalColumn.builder()
.name(typeDefine.getName())
.nullable(typeDefine.isNullable())
.defaultValue(typeDefine.getDefaultValue())
.comment(typeDefine.getComment());
String dataType = typeDefine.getDataType().toUpperCase();
switch (dataType) {
case "BOOLEAN":
}

return null;
}

/**
* Convert {@link Column} to an external system's type definition.
*
* @param column
* @return
*/
@Override
public BasicTypeDefine reconvert(Column column) {
BasicTypeDefine.BasicTypeDefineBuilder builder =
BasicTypeDefine.builder()
.name(column.getName())
.nullable(column.isNullable())
.comment(column.getComment())
.defaultValue(column.getDefaultValue());
switch (column.getDataType().getSqlType()) {
case BOOLEAN:
break;
case TINYINT:
break;
case SMALLINT:
break;
case INT:
break;
case BIGINT:
break;
case FLOAT:
break;
case DOUBLE:
break;
case DECIMAL:
break;
case STRING:
break;
case BYTES:
break;
case DATE:
break;
case TIME:
break;
case TIMESTAMP:
break;
default:
throw CommonError.convertToConnectorTypeError(
DatabaseIdentifier.GENERIC,
column.getDataType().getSqlType().name(),
column.getName());
}

return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;

public class GenericTypeMapper implements JdbcDialectTypeMapper {

private GenericTypeConverter typeConverter;

public GenericTypeMapper() {
this(GenericTypeConverter.DEFAULT_INSTANCE);
}

public GenericTypeMapper(GenericTypeConverter typeConverter) {
this.typeConverter = typeConverter;
}

@Override
public Column mappingColumn(BasicTypeDefine typeDefine) {
return typeConverter.convert(typeDefine);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,17 @@ public static JdbcDialect load(String url, String compatibleMode, String fieldId
JdbcDialectFactory.class.getName()));
}

final List<JdbcDialectFactory> matchingFactories =
List<JdbcDialectFactory> matchingFactories =
foundFactories.stream().filter(f -> f.acceptsURL(url)).collect(Collectors.toList());

if (matchingFactories.isEmpty()) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.NO_SUITABLE_DIALECT_FACTORY,
String.format(
"Could not find any jdbc dialect factory that can handle url '%s' that implements '%s' in the classpath.\n\n"
+ "Available factories are:\n\n"
+ "%s",
url,
JdbcDialectFactory.class.getName(),
foundFactories.stream()
.map(f -> f.getClass().getName())
.distinct()
.sorted()
.collect(Collectors.joining("\n"))));
// filter out generic dialect factory
if (matchingFactories.size() > 1) {
matchingFactories =
matchingFactories.stream()
.filter(f -> !(f instanceof GenericDialectFactory))
.collect(Collectors.toList());
}

if (matchingFactories.size() > 1) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.NO_SUITABLE_DIALECT_FACTORY,
Expand Down

0 comments on commit b7fc979

Please sign in to comment.