Skip to content

Commit

Permalink
[590] Add Iceberg HMS Catalog Sync implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kroushan-nit committed Dec 31, 2024
1 parent d60625c commit 25bf942
Show file tree
Hide file tree
Showing 30 changed files with 2,637 additions and 13 deletions.
11 changes: 7 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<module>xtable-hudi-support</module>
<module>xtable-core</module>
<module>xtable-utilities</module>
<module>xtable-hive-metastore</module>
</modules>

<properties>
Expand All @@ -61,8 +62,10 @@
<junit.version>5.9.0</junit.version>
<lombok.version>1.18.30</lombok.version>
<lombok-maven-plugin.version>1.18.20.0</lombok-maven-plugin.version>
<hadoop.version>3.4.0</hadoop.version>
<hadoop.version>3.4.1</hadoop.version>
<hudi.version>0.14.0</hudi.version>
<aws.version>2.29.40</aws.version>
<hive.version>2.3.9</hive.version>
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.8.0</maven-javadoc-plugin.version>
<maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version>
Expand Down Expand Up @@ -372,9 +375,9 @@
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>1.12.328</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bundle</artifactId>
<version>${aws.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ public class ExternalCatalogConfig {
/**
* The properties for this catalog, used for providing any custom behaviour during catalog sync
*/
@NonNull @Builder.Default Map<String, String> catalogProperties = Collections.emptyMap();
@Builder.Default Map<String, String> catalogProperties = Collections.emptyMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public enum ErrorCode {
UNSUPPORTED_SCHEMA_TYPE(10007),
UNSUPPORTED_FEATURE(10008),
PARSE_EXCEPTION(10009),
CATALOG_REFRESH_EXCEPTION(10010);
CATALOG_REFRESH_EXCEPTION(10010),
CATALOG_SYNC_GENERIC_EXCEPTION(10011);

private final int errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@
*/
public class CatalogType {
public static final String STORAGE = "STORAGE";
public static final String HMS = "HMS";
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ private <TABLE> CatalogSyncStatus syncCatalog(
CatalogSyncClient<TABLE> catalogSyncClient,
CatalogTableIdentifier tableIdentifier,
InternalTable table) {
log.info(
"Running catalog sync for table {} with format {} using catalogSync {}",
table.getBasePath(),
table.getTableFormat(),
catalogSyncClient.getClass().getName());
if (!catalogSyncClient.hasDatabase(tableIdentifier)) {
catalogSyncClient.createDatabase(tableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class TestThreePartHierarchicalTableIdentifier {
class TestHierarchicalTableIdentifier {

@Test
void testGetId() {
ThreePartHierarchicalTableIdentifier catalogTableIdentifier =
void testToString() {
HierarchicalTableIdentifier catalogTableIdentifier =
ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
"catalogName.databaseName.tableName");
assertEquals("catalogName.databaseName.tableName", catalogTableIdentifier.getId());
Expand Down
4 changes: 4 additions & 0 deletions xtable-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
</dependency>

<!-- Junit -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;

/**
* The interface for creating/updating catalog table object, each catalog can have its own
* implementation that can be plugged in.
*/
public interface CatalogTableBuilder<REQUEST, TABLE> {
public REQUEST getCreateTableRequest(InternalTable table, CatalogTableIdentifier tableIdentifier);

public REQUEST getUpdateTableRequest(
InternalTable table, TABLE catalogTable, CatalogTableIdentifier tableIdentifier);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;

public class CatalogUtils {

public static HierarchicalTableIdentifier castToHierarchicalTableIdentifier(
CatalogTableIdentifier tableIdentifier) {
if (tableIdentifier instanceof HierarchicalTableIdentifier) {
return (HierarchicalTableIdentifier) tableIdentifier;
}
throw new IllegalArgumentException("Invalid tableIdentifier implementation");
}
}
27 changes: 27 additions & 0 deletions xtable-core/src/main/java/org/apache/xtable/catalog/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

public class Constants {

public static final String PROP_SPARK_SQL_SOURCES_PROVIDER = "spark.sql.sources.provider";
public static final String PROP_PATH = "path";
public static final String PROP_SERIALIZATION_FORMAT = "serialization.format";
public static final String PROP_EXTERNAL = "EXTERNAL";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.xtable.catalog.Constants.PROP_SPARK_SQL_SOURCES_PROVIDER;

import java.util.Map;

import org.apache.iceberg.TableProperties;

import com.google.common.base.Strings;

import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.storage.TableFormat;

public class TableFormatUtils {

public static String getTableDataLocation(
String tableFormat, String tableLocation, Map<String, String> properties) {
switch (tableFormat) {
case TableFormat.ICEBERG:
return getIcebergDataLocation(tableLocation, properties);
case TableFormat.DELTA:
case TableFormat.HUDI:
return tableLocation;
default:
throw new NotSupportedException("Unsupported table format: " + tableFormat);
}
}

/** Get iceberg table data files location */
private static String getIcebergDataLocation(
String tableLocation, Map<String, String> properties) {
String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION);
if (dataLocation == null) {
dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
if (dataLocation == null) {
dataLocation = properties.get(TableProperties.OBJECT_STORE_PATH);
if (dataLocation == null) {
dataLocation = String.format("%s/data", tableLocation);
}
}
}
return dataLocation;
}

// Get table format name from table properties
public static String getTableFormat(Map<String, String> properties) {
// - In case of ICEBERG, table_type param will give the table format
// - In case of DELTA, table_type or spark.sql.sources.provider param will give the table
// format
// - In case of HUDI, spark.sql.sources.provider param will give the table format
String tableFormat = properties.get(TABLE_TYPE_PROP);
if (Strings.isNullOrEmpty(tableFormat)) {
tableFormat = properties.get(PROP_SPARK_SQL_SOURCES_PROVIDER);
}
return tableFormat;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.exception;

import org.apache.xtable.model.exception.ErrorCode;
import org.apache.xtable.model.exception.InternalException;

public class CatalogSyncException extends InternalException {

public CatalogSyncException(ErrorCode errorCode, String message, Throwable e) {
super(errorCode, message, e);
}

public CatalogSyncException(String message, Throwable e) {
super(ErrorCode.CATALOG_SYNC_GENERIC_EXCEPTION, message, e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import java.util.Collections;
import java.util.Map;

import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;

public class TestSchemaExtractorBase {
protected static InternalField getPrimitiveOneField(
String fieldName, String schemaName, InternalType dataType, boolean isNullable, int fieldId) {
return getPrimitiveOneField(
fieldName, schemaName, dataType, isNullable, fieldId, Collections.emptyMap());
}

protected static InternalField getPrimitiveOneField(
String fieldName,
String schemaName,
InternalType dataType,
boolean isNullable,
int fieldId,
String parentPath) {
return getPrimitiveOneField(
fieldName, schemaName, dataType, isNullable, fieldId, parentPath, Collections.emptyMap());
}

protected static InternalField getPrimitiveOneField(
String fieldName,
String schemaName,
InternalType dataType,
boolean isNullable,
int fieldId,
Map<InternalSchema.MetadataKey, Object> metadata) {
return getPrimitiveOneField(
fieldName, schemaName, dataType, isNullable, fieldId, null, metadata);
}

protected static InternalField getPrimitiveOneField(
String fieldName,
String schemaName,
InternalType dataType,
boolean isNullable,
int fieldId,
String parentPath,
Map<InternalSchema.MetadataKey, Object> metadata) {
return InternalField.builder()
.name(fieldName)
.parentPath(parentPath)
.schema(
InternalSchema.builder()
.name(schemaName)
.dataType(dataType)
.isNullable(isNullable)
.metadata(metadata)
.build())
.fieldId(fieldId)
.build();
}
}
Loading

0 comments on commit 25bf942

Please sign in to comment.