Skip to content

Commit

Permalink
add tests and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
kroushan-nit committed Dec 27, 2024
1 parent 356350a commit 970c2ef
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package org.apache.xtable.catalog;

import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.xtable.catalog.Constants.PROP_SPARK_SQL_SOURCES_PROVIDER;

import java.util.Map;

import org.apache.iceberg.TableProperties;

import com.google.common.base.Strings;

import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.storage.TableFormat;

Expand All @@ -32,6 +37,7 @@ public static String getTableDataLocation(
switch (tableFormat) {
case TableFormat.ICEBERG:
return getIcebergDataLocation(tableLocation, properties);
case TableFormat.DELTA:
case TableFormat.HUDI:
return tableLocation;
default:
Expand All @@ -54,4 +60,17 @@ private static String getIcebergDataLocation(
}
return dataLocation;
}

// Get table format name from table properties
public static String getTableFormat(Map<String, String> properties) {
// - In case of ICEBERG, table_type param will give the table format
// - In case of DELTA, table_type or spark.sql.sources.provider param will give the table
// format
// - In case of HUDI, spark.sql.sources.provider param will give the table format
String tableFormat = properties.get(TABLE_TYPE_PROP);
if (Strings.isNullOrEmpty(tableFormat)) {
tableFormat = properties.get(PROP_SPARK_SQL_SOURCES_PROVIDER);
}
return tableFormat;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.xtable.catalog.hms;

import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;

import java.util.Locale;
import java.util.Properties;

Expand All @@ -30,6 +28,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.thrift.TException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;

import org.apache.xtable.catalog.TableFormatUtils;
Expand All @@ -54,20 +53,28 @@ public HMSCatalogConversionSource(
}
}

@VisibleForTesting
HMSCatalogConversionSource(HMSCatalogConfig hmsCatalogConfig, IMetaStoreClient metaStoreClient) {
this.hmsCatalogConfig = hmsCatalogConfig;
this.metaStoreClient = metaStoreClient;
}

@Override
public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
try {
Table table =
metaStoreClient.getTable(
tableIdentifier.getDatabaseName(), tableIdentifier.getTableName());
if (table == null) {
throw new IllegalStateException(String.format("table: %s not found", tableIdentifier));
throw new IllegalStateException(String.format("table: %s is null", tableIdentifier));
}

String tableFormat = table.getParameters().get(TABLE_TYPE_PROP).toUpperCase(Locale.ENGLISH);
if (!Strings.isNullOrEmpty(tableFormat)) {
throw new IllegalStateException("TableFormat must not be null or empty");
String tableFormat = TableFormatUtils.getTableFormat(table.getParameters());
if (Strings.isNullOrEmpty(tableFormat)) {
throw new IllegalStateException(
String.format("TableFormat is null or empty for table: %s", tableIdentifier));
}
tableFormat = tableFormat.toUpperCase(Locale.ENGLISH);

String tableLocation = table.getSd().getLocation();
String dataPath =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hudi.common.util.VisibleForTesting;

@Log4j2
public class HMSClientProvider {

Expand All @@ -44,7 +42,6 @@ public HMSClientProvider(HMSCatalogConfig hmsCatalogConfig, Configuration config
this.configuration = configuration;
}

@VisibleForTesting
public IMetaStoreClient getMSC() throws MetaException, HiveException {
HiveConf hiveConf = new HiveConf(configuration, HiveConf.class);
hiveConf.set(METASTOREURIS.varname, hmsCatalogConfig.getServerUrl());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog.hms;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;

import lombok.SneakyThrows;

import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.exception.CatalogSyncException;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.storage.TableFormat;

@ExtendWith(MockitoExtension.class)
class TestHMSCatalogConversionSource {

@Mock private HMSCatalogConfig mockCatalogConfig;
@Mock private IMetaStoreClient mockMetaStoreClient;
private HMSCatalogConversionSource catalogConversionSource;
private static final String HMS_DB = "hms_db";
private static final String HMS_TABLE = "hms_tbl";
private static final String TABLE_BASE_PATH = "/var/data/table";
private final CatalogTableIdentifier tableIdentifier =
CatalogTableIdentifier.builder().databaseName(HMS_DB).tableName(HMS_TABLE).build();

@BeforeEach
void init() {
catalogConversionSource =
new HMSCatalogConversionSource(mockCatalogConfig, mockMetaStoreClient);
}

@SneakyThrows
@Test
void testGetSourceTable_errorGettingTableFromHMS() {
// error getting table from hms
when(mockMetaStoreClient.getTable(HMS_DB, HMS_TABLE))
.thenThrow(new TException("something went wrong"));
assertThrows(
CatalogSyncException.class, () -> catalogConversionSource.getSourceTable(tableIdentifier));

verify(mockMetaStoreClient, times(1)).getTable(HMS_DB, HMS_TABLE);
}

@SneakyThrows
@Test
void testGetSourceTable_tableNotFoundInHMS() {
// table not found in hms
when(mockMetaStoreClient.getTable(HMS_DB, HMS_TABLE))
.thenThrow(new NoSuchObjectException("table not found"));
assertThrows(
CatalogSyncException.class, () -> catalogConversionSource.getSourceTable(tableIdentifier));

verify(mockMetaStoreClient, times(1)).getTable(HMS_DB, HMS_TABLE);
}

@SneakyThrows
@Test
void testGetSourceTable_tableFormatNotPresent() {
// table format not present in table properties
when(mockMetaStoreClient.getTable(HMS_DB, HMS_TABLE))
.thenReturn(newHmsTable(HMS_DB, HMS_TABLE, Collections.emptyMap(), null));
IllegalStateException exception =
assertThrows(
IllegalStateException.class,
() -> catalogConversionSource.getSourceTable(tableIdentifier));
assertEquals("TableFormat is null or empty for table: hms_db.hms_tbl", exception.getMessage());

verify(mockMetaStoreClient, times(1)).getTable(HMS_DB, HMS_TABLE);
}

@SneakyThrows
@ParameterizedTest
@CsvSource(value = {"ICEBERG", "HUDI", "DELTA"})
void testGetSourceTable(String tableFormat) {
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation(TABLE_BASE_PATH);
Map<String, String> tableParams = new HashMap<>();
if (Objects.equals(tableFormat, TableFormat.ICEBERG)) {
tableParams.put("write.data.path", String.format("%s/iceberg", TABLE_BASE_PATH));
tableParams.put("table_type", tableFormat);
} else {
tableParams.put("spark.sql.sources.provider", tableFormat);
}

String dataPath =
tableFormat.equals(TableFormat.ICEBERG)
? String.format("%s/iceberg", TABLE_BASE_PATH)
: TABLE_BASE_PATH;
SourceTable expected =
newSourceTable(HMS_TABLE, TABLE_BASE_PATH, dataPath, tableFormat, tableParams);
when(mockMetaStoreClient.getTable(HMS_DB, HMS_TABLE))
.thenReturn(newHmsTable(HMS_DB, HMS_TABLE, tableParams, sd));
SourceTable output = catalogConversionSource.getSourceTable(tableIdentifier);
assertEquals(expected, output);
}

private Table newHmsTable(
String dbName, String tableName, Map<String, String> params, StorageDescriptor sd) {
Table table = new Table();
table.setDbName(dbName);
table.setTableName(tableName);
table.setParameters(params);
table.setSd(sd);
return table;
}

private SourceTable newSourceTable(
String tblName,
String basePath,
String dataPath,
String tblFormat,
Map<String, String> params) {
Properties tblProperties = new Properties();
tblProperties.putAll(params);
return SourceTable.builder()
.name(tblName)
.basePath(basePath)
.dataPath(dataPath)
.formatName(tblFormat)
.additionalProperties(tblProperties)
.build();
}
}
Loading

0 comments on commit 970c2ef

Please sign in to comment.