Skip to content

Commit

Permalink
Introduce redaction for sensitive statements
Browse files Browse the repository at this point in the history
This commit introduces redacting of security-sensitive information in
statements containing catalog properties, specifically:

* CREATE CATALOG
* EXPLAIN CREATE CATALOG
* EXPLAIN ANALYZE CREATE CATALOG

The current approach is as follows:

* For syntactically valid statements, only properties containing
sensitive information are masked.
* If a query is syntactically valid but retrieving security-sensitive
properties fails for any reason (e.g., the query references a
nonexistent connector or catalog property evaluation fails), all
properties are masked.
* If a query fails before or during parsing, the entire query is masked.

The redacted form is created right before initialization of the
QueryStateMachine and is propagated to all places that create QueryInfo
and BasicQueryInfo (e.g., REST endpoints, query events, and
the system.runtime.queries table). Before this change,
QueryInfo/BasicQueryInfo stored the raw query text received from the end
user. From now on, the text will be altered for the cases listed above.
  • Loading branch information
piotrrzysko committed Jan 13, 2025
1 parent 5f9b08a commit f15a34e
Show file tree
Hide file tree
Showing 38 changed files with 770 additions and 44 deletions.
14 changes: 14 additions & 0 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public class FeaturesConfig

private boolean faultTolerantExecutionExchangeEncryptionEnabled = true;

private boolean statementRedactingEnabled = true;

public enum DataIntegrityVerification
{
NONE,
Expand Down Expand Up @@ -514,6 +516,18 @@ public FeaturesConfig setFaultTolerantExecutionExchangeEncryptionEnabled(boolean
return this;
}

public boolean isStatementRedactingEnabled()
{
return statementRedactingEnabled;
}

@Config("deprecated.statement-redacting-enabled")
public FeaturesConfig setStatementRedactingEnabled(boolean statementRedactingEnabled)
{
this.statementRedactingEnabled = statementRedactingEnabled;
return this;
}

public void applyFaultTolerantExecutionDefaults()
{
exchangeCompressionCodec = LZ4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.connector.ConnectorName;

import java.util.Set;

@ThreadSafe
public interface CatalogFactory
{
Expand All @@ -28,4 +30,6 @@ public interface CatalogFactory
CatalogConnector createCatalog(CatalogProperties catalogProperties);

CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName connectorName, Connector connector);

Set<String> getSecuritySensitivePropertyNames(CatalogProperties catalogProperties);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.trino.connector;

import io.trino.spi.catalog.CatalogName;
import io.trino.spi.catalog.CatalogProperties;
import io.trino.spi.connector.ConnectorName;

import java.util.Map;

public interface CatalogPropertiesProvider
{
CatalogProperties getCatalogProperties(CatalogName catalogName, ConnectorName connectorName, Map<String, String> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.connector;

import com.google.common.collect.ImmutableSet;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.inject.Inject;
import io.airlift.configuration.secrets.SecretsResolver;
Expand Down Expand Up @@ -45,6 +46,7 @@

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand Down Expand Up @@ -144,6 +146,25 @@ public CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName
return createCatalog(catalogHandle, connectorName, connector, Optional.empty());
}

@Override
public Set<String> getSecuritySensitivePropertyNames(CatalogProperties catalogProperties)
{
ConnectorFactory connectorFactory = connectorFactories.get(catalogProperties.connectorName());
if (connectorFactory == null) {
// If someone tries to use a non-existent connector, we assume they
// misspelled the name and, for safety, we redact all the properties.
return ImmutableSet.copyOf(catalogProperties.properties().keySet());
}

ConnectorContext context = createConnectorContext(catalogProperties.catalogHandle());
String catalogName = catalogProperties.catalogHandle().getCatalogName().toString();
Map<String, String> config = secretsResolver.getResolvedConfiguration(catalogProperties.properties());

try (ThreadContextClassLoader _ = new ThreadContextClassLoader(connectorFactory.getClass().getClassLoader())) {
return connectorFactory.getSecuritySensitivePropertyNames(catalogName, config, context);
}
}

private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName connectorName, Connector connector, Optional<CatalogProperties> catalogProperties)
{
Tracer tracer = createTracer(catalogHandle);
Expand Down Expand Up @@ -196,7 +217,16 @@ private Connector createConnector(
ConnectorFactory connectorFactory,
Map<String, String> properties)
{
ConnectorContext context = new ConnectorContextInstance(
ConnectorContext context = createConnectorContext(catalogHandle);

try (ThreadContextClassLoader _ = new ThreadContextClassLoader(connectorFactory.getClass().getClassLoader())) {
return connectorFactory.create(catalogName, properties, context);
}
}

private ConnectorContext createConnectorContext(CatalogHandle catalogHandle)
{
return new ConnectorContextInstance(
catalogHandle,
openTelemetry,
createTracer(catalogHandle),
Expand All @@ -206,10 +236,6 @@ private Connector createConnector(
new InternalMetadataProvider(metadata, typeManager),
pageSorter,
pageIndexerFactory);

try (ThreadContextClassLoader _ = new ThreadContextClassLoader(connectorFactory.getClass().getClassLoader())) {
return connectorFactory.create(catalogName, properties, context);
}
}

private Tracer createTracer(CatalogHandle catalogHandle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ protected void setup(Binder binder)

configBinder(binder).bindConfig(CatalogPruneTaskConfig.class);
binder.bind(CatalogPruneTask.class).in(Scopes.SINGLETON);

binder.bind(CatalogPropertiesProvider.class).to(DynamicCatalogPropertiesProvider.class).in(Scopes.SINGLETON);
}
else {
binder.bind(WorkerDynamicCatalogManager.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.trino.connector;

import com.google.inject.Inject;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.catalog.CatalogProperties;
import io.trino.spi.catalog.CatalogStore;
import io.trino.spi.connector.ConnectorName;

import java.util.Map;

import static java.util.Objects.requireNonNull;

public class DynamicCatalogPropertiesProvider
implements CatalogPropertiesProvider
{
private final CatalogStore catalogStore;

@Inject
public DynamicCatalogPropertiesProvider(CatalogStore catalogStore)
{
this.catalogStore = requireNonNull(catalogStore, "catalogStore is null");
}

@Override
public CatalogProperties getCatalogProperties(CatalogName catalogName, ConnectorName connectorName, Map<String, String> properties)
{
return catalogStore.createCatalogProperties(catalogName, connectorName, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.connector.ConnectorName;

import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -51,6 +52,12 @@ public CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName
return getDelegate().createCatalog(catalogHandle, connectorName, connector);
}

@Override
public Set<String> getSecuritySensitivePropertyNames(CatalogProperties catalogProperties)
{
return getDelegate().getSecuritySensitivePropertyNames(catalogProperties);
}

private CatalogFactory getDelegate()
{
CatalogFactory catalogFactory = delegate.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.connector;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Files;
import com.google.errorprone.annotations.ThreadSafe;
Expand All @@ -29,7 +28,6 @@
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.catalog.CatalogProperties;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.CatalogHandle.CatalogVersion;
import io.trino.spi.connector.ConnectorName;
import jakarta.annotation.PreDestroy;

Expand All @@ -56,7 +54,6 @@
import static io.trino.spi.StandardErrorCode.CATALOG_NOT_AVAILABLE;
import static io.trino.spi.StandardErrorCode.CATALOG_NOT_FOUND;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.CatalogHandle.createRootCatalogHandle;
import static io.trino.util.Executors.executeUntilFailure;
import static java.util.Objects.requireNonNull;

Expand All @@ -77,9 +74,14 @@ private enum State { CREATED, INITIALIZED, STOPPED }
private final AtomicReference<State> state = new AtomicReference<>(State.CREATED);

@Inject
public StaticCatalogManager(CatalogFactory catalogFactory, StaticCatalogManagerConfig config, @ForStartup Executor executor)
public StaticCatalogManager(
CatalogFactory catalogFactory,
StaticCatalogManagerConfig config,
@ForStartup Executor executor,
StaticCatalogPropertiesProvider propertiesProvider)
{
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
requireNonNull(propertiesProvider, "propertiesProvider is null");
List<String> disabledCatalogs = firstNonNull(config.getDisabledCatalogs(), ImmutableList.of());

ImmutableList.Builder<CatalogProperties> catalogProperties = ImmutableList.builder();
Expand Down Expand Up @@ -107,10 +109,7 @@ public StaticCatalogManager(CatalogFactory catalogFactory, StaticCatalogManagerC
log.warn("Catalog '%s' is using the deprecated connector name '%s'. The correct connector name is '%s'", catalogName, deprecatedConnectorName, connectorName);
}

catalogProperties.add(new CatalogProperties(
createRootCatalogHandle(new CatalogName(catalogName), new CatalogVersion("default")),
new ConnectorName(connectorName),
ImmutableMap.copyOf(properties)));
catalogProperties.add(propertiesProvider.getCatalogProperties(new CatalogName(catalogName), new ConnectorName(connectorName), properties));
}
this.catalogProperties = catalogProperties.build();
this.executor = requireNonNull(executor, "executor is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public void configure(Binder binder)
binder.bind(CatalogManager.class).to(StaticCatalogManager.class).in(Scopes.SINGLETON);

binder.bind(LazyRegister.class).asEagerSingleton();

binder.bind(StaticCatalogPropertiesProvider.class).in(Scopes.SINGLETON);
binder.bind(CatalogPropertiesProvider.class).to(StaticCatalogPropertiesProvider.class).in(Scopes.SINGLETON);
}

private static class LazyRegister
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.trino.connector;

import com.google.common.collect.ImmutableMap;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.catalog.CatalogProperties;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorName;

import java.util.Map;

import static io.trino.spi.connector.CatalogHandle.createRootCatalogHandle;

public class StaticCatalogPropertiesProvider
implements CatalogPropertiesProvider
{
@Override
public CatalogProperties getCatalogProperties(CatalogName catalogName, ConnectorName connectorName, Map<String, String> properties)
{
return new CatalogProperties(
createRootCatalogHandle(catalogName, new CatalogHandle.CatalogVersion("default")),
connectorName,
ImmutableMap.copyOf(properties));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.dispatcher;

import com.google.common.base.Function;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -44,6 +45,9 @@
import io.trino.spi.TrinoException;
import io.trino.spi.resourcegroups.SelectionContext;
import io.trino.spi.resourcegroups.SelectionCriteria;
import io.trino.sql.SensitiveStatementRedactor;
import io.trino.sql.tree.Expression;
import io.trino.sql.tree.Statement;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.weakref.jmx.Flatten;
Expand Down Expand Up @@ -84,6 +88,7 @@ public class DispatchManager
private final SessionPropertyDefaults sessionPropertyDefaults;
private final SessionPropertyManager sessionPropertyManager;
private final Tracer tracer;
private final SensitiveStatementRedactor sensitiveStatementRedactor;

private final int maxQueryLength;

Expand All @@ -107,6 +112,7 @@ public DispatchManager(
SessionPropertyDefaults sessionPropertyDefaults,
SessionPropertyManager sessionPropertyManager,
Tracer tracer,
SensitiveStatementRedactor sensitiveStatementRedactor,
QueryManagerConfig queryManagerConfig,
DispatchExecutor dispatchExecutor,
QueryMonitor queryMonitor)
Expand All @@ -121,6 +127,7 @@ public DispatchManager(
this.sessionPropertyDefaults = requireNonNull(sessionPropertyDefaults, "sessionPropertyDefaults is null");
this.sessionPropertyManager = sessionPropertyManager;
this.tracer = requireNonNull(tracer, "tracer is null");
this.sensitiveStatementRedactor = requireNonNull(sensitiveStatementRedactor, "sensitiveStatementRedactor is null");

this.maxQueryLength = queryManagerConfig.getMaxQueryLength();

Expand Down Expand Up @@ -240,7 +247,7 @@ private <C> void createQueryInternal(QueryId queryId, Span querySpan, Slug slug,
DispatchQuery dispatchQuery = dispatchQueryFactory.createDispatchQuery(
session,
sessionContext.getTransactionId(),
query,
getRedactedQueryProvider(preparedQuery, query),
preparedQuery,
slug,
selectionContext.getResourceGroupId());
Expand All @@ -266,8 +273,14 @@ private <C> void createQueryInternal(QueryId queryId, Span querySpan, Slug slug,
.setSource(sessionContext.getSource().orElse(null))
.build();
}
String redactedQuery = sensitiveStatementRedactor.redact(query);
Optional<String> preparedSql = Optional.ofNullable(preparedQuery).flatMap(PreparedQuery::getPrepareSql);
DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, preparedSql, Optional.empty(), throwable);
DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(
session,
redactedQuery,
preparedSql,
Optional.empty(),
throwable);
queryCreated(failedDispatchQuery);
// maintain proper order of calls such that EventListener has access to QueryInfo
// - add query to tracker
Expand All @@ -280,6 +293,13 @@ private <C> void createQueryInternal(QueryId queryId, Span querySpan, Slug slug,
}
}

private Function<Session, String> getRedactedQueryProvider(PreparedQuery preparedQuery, String query)
{
Statement statement = preparedQuery.getStatement();
List<Expression> parameters = preparedQuery.getParameters();
return session -> sensitiveStatementRedactor.redact(query, statement, session, parameters);
}

private boolean queryCreated(DispatchQuery dispatchQuery)
{
boolean queryAdded = queryTracker.addQuery(dispatchQuery);
Expand Down
Loading

0 comments on commit f15a34e

Please sign in to comment.