Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add iceberg_tables table function #24469

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.metastore;

import com.google.common.collect.ImmutableSet;
import io.trino.metastore.HivePrivilegeInfo.HivePrivilege;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -67,6 +68,11 @@ default boolean useSparkTableStatistics()

raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
List<TableInfo> getTables(String databaseName);

/**
* @param parameterValues is using ImmutableSet to mark that this api does not support filtering by null parameter value.
*/
List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues);

void createDatabase(Database database);

void dropDatabase(String databaseName, boolean deleteData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.metastore.tracing;

import com.google.common.collect.ImmutableSet;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.trino.metastore.AcidOperation;
Expand Down Expand Up @@ -164,6 +165,20 @@ public List<TableInfo> getTables(String databaseName)
});
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
Span span = tracer.spanBuilder("HiveMetastore.getTableNamesWithParameters")
.setAttribute(SCHEMA, databaseName)
.setAttribute(TABLE, parameterKey)
.startSpan();
return withTracing(span, () -> {
List<String> tables = delegate.getTableNamesWithParameters(databaseName, parameterKey, parameterValues);
span.setAttribute(TABLE_RESPONSE_COUNT, tables.size());
return tables;
});
}

@Override
public void createDatabase(Database database)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public enum ObjectType
private final LoadingCache<HiveTableName, Optional<Table>> tableCache;
private final LoadingCache<String, List<TableInfo>> tablesCacheNew;
private final Cache<HiveTableName, AtomicReference<Map<String, HiveColumnStatistics>>> tableColumnStatisticsCache;
private final LoadingCache<TablesWithParameterCacheKey, List<String>> tableNamesWithParametersCache;
private final Cache<HivePartitionName, AtomicReference<Map<String, HiveColumnStatistics>>> partitionStatisticsCache;
private final Cache<HivePartitionName, AtomicReference<Optional<Partition>>> partitionCache;
private final LoadingCache<PartitionFilter, Optional<List<String>>> partitionFilterCache;
Expand Down Expand Up @@ -206,6 +207,7 @@ private CachingHiveMetastore(
tablesCacheNew = cacheFactory.buildCache(this::loadTablesNew);
tableColumnStatisticsCache = statsCacheFactory.buildCache(this::refreshTableColumnStatistics);
tableCache = cacheFactory.buildCache(this::loadTable);
tableNamesWithParametersCache = cacheFactory.buildCache(this::loadTablesMatchingParameter);
tablePrivilegesCache = cacheFactory.buildCache(key -> loadTablePrivileges(key.database(), key.table(), key.owner(), key.principal()));
rolesCache = cacheFactory.buildCache(_ -> loadRoles());
roleGrantsCache = cacheFactory.buildCache(this::loadRoleGrants);
Expand All @@ -223,6 +225,7 @@ public void flushCache()
tablesCacheNew.invalidateAll();
databaseCache.invalidateAll();
tableCache.invalidateAll();
tableNamesWithParametersCache.invalidateAll();
partitionCache.invalidateAll();
partitionFilterCache.invalidateAll();
tablePrivilegesCache.invalidateAll();
Expand Down Expand Up @@ -565,6 +568,18 @@ private List<TableInfo> loadTablesNew(String databaseName)
return delegate.getTables(databaseName);
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
TablesWithParameterCacheKey key = new TablesWithParameterCacheKey(databaseName, parameterKey, parameterValues);
return get(tableNamesWithParametersCache, key);
}

private List<String> loadTablesMatchingParameter(TablesWithParameterCacheKey key)
{
return delegate.getTableNamesWithParameters(key.databaseName(), key.parameterKey(), key.parameterValues());
}

@Override
public void createDatabase(Database database)
{
Expand Down Expand Up @@ -733,6 +748,7 @@ public void invalidateTable(String databaseName, String tableName)
HiveTableName hiveTableName = new HiveTableName(databaseName, tableName);
tableCache.invalidate(hiveTableName);
tablesCacheNew.invalidate(databaseName);
tableNamesWithParametersCache.invalidateAll();
invalidateAllIf(tablePrivilegesCache, userTableKey -> userTableKey.matches(databaseName, tableName));
tableColumnStatisticsCache.invalidate(hiveTableName);
invalidatePartitionCache(databaseName, tableName);
Expand Down Expand Up @@ -1153,6 +1169,16 @@ private static <K, V> Cache<K, AtomicReference<V>> buildBulkCache(
return cacheBuilder.build();
}

record TablesWithParameterCacheKey(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
TablesWithParameterCacheKey
{
requireNonNull(databaseName, "databaseName is null");
requireNonNull(parameterKey, "parameterKey is null");
requireNonNull(parameterValues, "parameterValues is null");
lukasz-stec marked this conversation as resolved.
Show resolved Hide resolved
}
}

record UserTableKey(Optional<HivePrincipal> principal, String database, String table, Optional<String> owner)
{
UserTableKey
Expand Down Expand Up @@ -1201,6 +1227,13 @@ public CacheStatsMBean getTableNamesStats()
return new CacheStatsMBean(tablesCacheNew);
}

@Managed
@Nested
public CacheStatsMBean getTableWithParameterStats()
{
return new CacheStatsMBean(tableNamesWithParametersCache);
}

@Managed
@Nested
public CacheStatsMBean getTableColumnStatisticsStats()
Expand Down Expand Up @@ -1275,6 +1308,11 @@ LoadingCache<HiveTableName, Optional<Table>> getTableCache()
return tableCache;
}

LoadingCache<TablesWithParameterCacheKey, List<String>> getTableNamesWithParametersCache()
{
return tableNamesWithParametersCache;
}

public LoadingCache<String, List<TableInfo>> getTablesCacheNew()
{
return tablesCacheNew;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ public AggregateCacheStatsMBean getTablesStats()
return new AggregateCacheStatsMBean(CachingHiveMetastore::getTablesCacheNew);
}

@Managed
@Nested
public AggregateCacheStatsMBean getTableWithParameterStats()
{
return new AggregateCacheStatsMBean(CachingHiveMetastore::getTableNamesWithParametersCache);
}

@Managed
@Nested
public AggregateCacheStatsMBean getTableColumnStatisticsCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -177,7 +178,7 @@ public FileHiveMetastore(NodeVersion nodeVersion, TrinoFileSystemFactory fileSys

listTablesCache = EvictableCacheBuilder.newBuilder()
.expireAfterWrite(10, SECONDS)
.build(CacheLoader.from(this::doListAllTables));
.build(CacheLoader.from(databaseName -> doListAllTables(databaseName, _ -> true)));
}

@Override
Expand Down Expand Up @@ -532,7 +533,16 @@ private List<TableInfo> listAllTables(String databaseName)
return listTablesCache.getUnchecked(databaseName);
}

private synchronized List<TableInfo> doListAllTables(String databaseName)
@Override
public synchronized List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
requireNonNull(parameterKey, "parameterKey is null");
return doListAllTables(databaseName, table -> parameterValues.contains(table.getParameters().get(parameterKey))).stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle the case where table.getParameters().get(parameterKey) is null separately ? That potentially is nullptr exception

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the type of the parameterValues to ImmutableSet to avoid this problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about how changing the type helps.
I was thinking about the case where given parameter key does not exist in table parameters. I think the safe thing to do would be to return empty list of table names in that case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Set can contain null values so theoretically an API with Set could mean a caller would like to filter for null or non-existent values. ImmutableSet cannot contain null, and additionally, it is safe to check for null existence there. This means that ImmutableSet better describes the API here which only supports non-null values for the filter

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems a subtle distinction, it would be easy for users of the API to miss it.
Can we just handle that corner case safely instead ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can they miss it? With ImmutableSet there is no way to pass the wrong argument

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't pass in a key for which there is no entry in table parameters ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can, that is actually the point of this to return only those that have it. But that works ok becasue the parameters values cannot contain null thus potential mistake to try to filter by null value will just not work and it is safe to check for null element becasue ImmutableSet as all immutable collections explicitly allows for null argument to com.google.common.collect.ImmutableCollection#contains.

public abstract boolean contains(@CheckForNull Object object);

.map(tableInfo -> tableInfo.tableName().getTableName())
.collect(toImmutableList());
}

private synchronized List<TableInfo> doListAllTables(String databaseName, Predicate<TableMetadata> tableMetadataPredicate)
{
requireNonNull(databaseName, "databaseName is null");

Expand All @@ -557,7 +567,8 @@ private synchronized List<TableInfo> doListAllTables(String databaseName)
Location schemaFileLocation = subdirectory.appendPath(TRINO_SCHEMA_FILE_NAME_SUFFIX);
readFile("table schema", schemaFileLocation, tableCodec).ifPresent(tableMetadata -> {
checkVersion(tableMetadata.getWriterVersion());
if (hideDeltaLakeTables && DELTA_LAKE_PROVIDER.equals(tableMetadata.getParameters().get(SPARK_TABLE_PROVIDER_KEY))) {
if ((hideDeltaLakeTables && DELTA_LAKE_PROVIDER.equals(tableMetadata.getParameters().get(SPARK_TABLE_PROVIDER_KEY)))
|| !tableMetadataPredicate.test(tableMetadata)) {
return;
}
tables.add(new TableInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,21 @@ public void setDatabaseOwner(String databaseName, HivePrincipal principal)
@Override
public List<TableInfo> getTables(String databaseName)
{
return glueCache.getTables(databaseName, cacheTable -> getTablesInternal(cacheTable, databaseName));
return glueCache.getTables(databaseName, cacheTable -> getTablesInternal(cacheTable, databaseName, _ -> true));
}

private List<TableInfo> getTablesInternal(Consumer<Table> cacheTable, String databaseName)
@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
return getTablesInternal(
_ -> {},
databaseName,
table -> table.parameters() != null && parameterValues.contains(table.parameters().get(parameterKey))).stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle the case where table.parameters().get(parameterKey) is null separately ? That potentially is nullptr exception

.map(tableInfo -> tableInfo.tableName().getTableName())
.collect(toImmutableList());
}

private List<TableInfo> getTablesInternal(Consumer<Table> cacheTable, String databaseName, Predicate<software.amazon.awssdk.services.glue.model.Table> filter)
{
try {
ImmutableList<software.amazon.awssdk.services.glue.model.Table> glueTables = stats.getGetTables()
Expand All @@ -425,6 +436,7 @@ private List<TableInfo> getTablesInternal(Consumer<Table> cacheTable, String dat
.map(GetTablesResponse::tableList)
.flatMap(List::stream))
.filter(tableVisibilityFilter)
.filter(filter)
.collect(toImmutableList());

// Store only valid tables in cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,25 @@ public List<TableInfo> getTables(String databaseName)
}
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
try {
return getGlueTables(databaseName)
.filter(tableFilter)
.filter(table -> parameterValues.contains(getTableParameters(table).get(parameterKey)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question as above about the null case

.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
}
catch (EntityNotFoundException | AccessDeniedException e) {
// database does not exist or permission denied
return ImmutableList.of();
}
catch (AmazonServiceException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
}
}

@Override
public Optional<Table> getTable(String databaseName, String tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.hive.thrift.metastore.FieldSchema;
import io.trino.metastore.AcidOperation;
import io.trino.metastore.AcidTransactionOwner;
Expand Down Expand Up @@ -152,6 +153,12 @@ public List<TableInfo> getTables(String databaseName)
.collect(toImmutableList());
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
return delegate.getTableNamesWithParameters(databaseName, parameterKey, parameterValues);
}

@Override
public void createDatabase(Database database)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class DefaultThriftMetastoreClientFactory

private final MetastoreSupportsDateStatistics metastoreSupportsDateStatistics = new MetastoreSupportsDateStatistics();
private final AtomicInteger chosenGetTableAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenTableParamAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenAlterTransactionalTableAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenAlterPartitionsAlternative = new AtomicInteger(Integer.MAX_VALUE);

Expand Down Expand Up @@ -115,6 +116,7 @@ protected ThriftMetastoreClient create(TransportSupplier transportSupplier, Stri
metastoreSupportsDateStatistics,
true,
chosenGetTableAlternative,
chosenTableParamAlternative,
chosenAlterTransactionalTableAlternative,
chosenAlterPartitionsAlternative);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -92,6 +93,13 @@ public List<TableMeta> getTableMeta(String databaseName)
return runWithHandle(() -> delegate.getTableMeta(databaseName));
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, Set<String> parameterValues)
throws TException
{
return runWithHandle(() -> delegate.getTableNamesWithParameters(databaseName, parameterKey, parameterValues));
}

@Override
public void createDatabase(Database database)
throws TException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class HttpThriftMetastoreClientFactory
private final OpenTelemetry openTelemetry;

private final AtomicInteger chosenGetTableAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenGetTableParamAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenAlterTransactionalTableAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenAlterPartitionsAlternative = new AtomicInteger(Integer.MAX_VALUE);

Expand Down Expand Up @@ -85,6 +86,7 @@ public ThriftMetastoreClient create(URI uri, Optional<String> delegationToken)
new MetastoreSupportsDateStatistics(),
false,
chosenGetTableAlternative,
chosenGetTableParamAlternative,
chosenAlterTransactionalTableAlternative,
chosenAlterPartitionsAlternative);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,30 @@ public List<TableMeta> getTables(String databaseName)
}
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, Set<String> parameterValues)
{
try {
return retry()
.stopOn(NoSuchObjectException.class)
.stopOnIllegalExceptions()
.run("getTableNamesWithParameters", () -> {
try (ThriftMetastoreClient client = createMetastoreClient()) {
return client.getTableNamesWithParameters(databaseName, parameterKey, parameterValues);
}
});
}
catch (NoSuchObjectException e) {
return ImmutableList.of();
}
catch (TException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
}
catch (Exception e) {
throw propagate(e);
}
}

@Override
public Optional<Table> getTable(String databaseName, String tableName)
{
Expand Down
Loading
Loading