Skip to content

Commit

Permalink
Passing snowflake JDBC parameter into parameters fields instead of wi… (
Browse files Browse the repository at this point in the history
  • Loading branch information
chngpe authored Nov 15, 2024
1 parent 165f6b3 commit 7f8c53c
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public Map<String, String> createEnvironment() throws RuntimeException

HashMap<String, String> connectionEnvironment = new HashMap<>();
if (StringUtils.isNotBlank(glueConnectionName)) {
connectionEnvironment.put(DEFAULT_GLUE_CONNECTION, glueConnectionName);
Connection connection = getGlueConnection(glueConnectionName);
Map<String, String> connectionPropertiesWithSecret = new HashMap<>(connection.connectionPropertiesAsStrings());
connectionPropertiesWithSecret.putAll(authenticationConfigurationToMap(connection.authenticationConfiguration()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,56 @@
package com.amazonaws.athena.connectors.snowflake;

import com.amazonaws.athena.connectors.jdbc.JdbcEnvironmentProperties;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static com.amazonaws.athena.connector.lambda.connection.EnvironmentConstants.DATABASE;
import static com.amazonaws.athena.connector.lambda.connection.EnvironmentConstants.DEFAULT;
import static com.amazonaws.athena.connector.lambda.connection.EnvironmentConstants.DEFAULT_GLUE_CONNECTION;
import static com.amazonaws.athena.connector.lambda.connection.EnvironmentConstants.HOST;
import static com.amazonaws.athena.connector.lambda.connection.EnvironmentConstants.PORT;
import static com.amazonaws.athena.connector.lambda.connection.EnvironmentConstants.SCHEMA;
import static com.amazonaws.athena.connector.lambda.connection.EnvironmentConstants.WAREHOUSE;

public class SnowflakeEnvironmentProperties extends JdbcEnvironmentProperties
{
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeEnvironmentProperties.class);
private static final String WAREHOUSE_PROPERTY_KEY = "warehouse";
private static final String DB_PROPERTY_KEY = "db";
private static final String SCHEMA_PROPERTY_KEY = "schema";
private static final String SNOWFLAKE_ESCAPE_CHARACTER = "\"";

@Override
public Map<String, String> connectionPropertiesToEnvironment(Map<String, String> connectionProperties)
{
HashMap<String, String> environment = new HashMap<>();

// now construct jdbc string
String connectionString = getConnectionStringPrefix(connectionProperties) + connectionProperties.get(HOST);
// put it as environment variable so we can put it as JDBC parameters later when creation connection (not with JDBC)
Optional.ofNullable(connectionProperties.get(WAREHOUSE)).ifPresent(x -> environment.put(WAREHOUSE, x));
Optional.ofNullable(connectionProperties.get(DATABASE)).ifPresent(x -> environment.put(DATABASE, x));
Optional.ofNullable(connectionProperties.get(SCHEMA)).ifPresent(x -> environment.put(SCHEMA, x));

// now construct jdbc string, Snowflake JDBC should just be plain JDBC String. Parameter in JDBC string will get upper case.
StringBuilder connectionStringBuilder = new StringBuilder(getConnectionStringPrefix(connectionProperties));
connectionStringBuilder.append(connectionProperties.get(HOST));
if (connectionProperties.containsKey(PORT)) {
connectionString = connectionString + ":" + connectionProperties.get(PORT);
connectionStringBuilder
.append(":")
.append(connectionProperties.get(PORT));
}

String jdbcParametersString = getJdbcParameters(connectionProperties);
if (!Strings.isNullOrEmpty(jdbcParametersString)) {
LOGGER.info("JDBC parameters found, adding to JDBC String");
connectionStringBuilder.append(getSnowflakeJDBCParameterPrefix()).append(getJdbcParameters(connectionProperties));
}
connectionString = connectionString + getDatabase(connectionProperties) + getJdbcParameters(connectionProperties);

environment.put(DEFAULT, connectionString);
environment.put(DEFAULT, connectionStringBuilder.toString());
return environment;
}

Expand All @@ -55,22 +79,59 @@ protected String getConnectionStringPrefix(Map<String, String> connectionPropert
return "snowflake://jdbc:snowflake://";
}

/**
* For Snowflake, we don't put warehouse, database or schema information to the JDBC String to avoid casing issues.
* @param connectionProperties
* @return
*/
@Override
protected String getDatabase(Map<String, String> connectionProperties)
{
if (!connectionProperties.containsKey(SCHEMA)) {
logger.debug("No schema specified in connection string");
}

String databaseString = "/?warehouse=" + connectionProperties.get(WAREHOUSE)
+ "&db=" + connectionProperties.get(DATABASE)
+ "&schema=" + connectionProperties.get(SCHEMA);
return databaseString;
return "";
}

@Override
protected String getJdbcParametersSeparator()
{
return "&";
}

private String getSnowflakeJDBCParameterPrefix()
{
return "/?";
}

private static String getValueWrapperWithEscapedCharacter(String input)
{
return SNOWFLAKE_ESCAPE_CHARACTER + input + SNOWFLAKE_ESCAPE_CHARACTER;
}

private static boolean isGlueConnection(Map<String, String> properties)
{
return properties.containsKey(DEFAULT_GLUE_CONNECTION);
}

public static Map<String, String> getSnowFlakeParameter(Map<String, String> baseProperty, Map<String, String> connectionProperties)
{
logger.debug("getSnowFlakeParameter, Loading connection properties");
Map<String, String> parameters = new HashMap<>(baseProperty);

if (!isGlueConnection(connectionProperties)) {
return parameters;
}

if (!connectionProperties.containsKey(SCHEMA)) {
logger.debug("No schema specified in connection string");
}

parameters.put(WAREHOUSE_PROPERTY_KEY, getValueWrapperWithEscapedCharacter(connectionProperties.get(WAREHOUSE)));
parameters.put(DB_PROPERTY_KEY, getValueWrapperWithEscapedCharacter(connectionProperties.get(DATABASE)));

if (connectionProperties.containsKey(SCHEMA)) {
logger.debug("Found schema specified");
parameters.put(SCHEMA_PROPERTY_KEY, getValueWrapperWithEscapedCharacter(connectionProperties.get(SCHEMA)));
}

return parameters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ public SnowflakeMetadataHandler(java.util.Map<String, String> configOptions)
public SnowflakeMetadataHandler(DatabaseConnectionConfig databaseConnectionConfig, java.util.Map<String, String> configOptions)
{
this(databaseConnectionConfig, new GenericJdbcConnectionFactory(databaseConnectionConfig,
JDBC_PROPERTIES, new DatabaseConnectionInfo(SnowflakeConstants.SNOWFLAKE_DRIVER_CLASS,
SnowflakeConstants.SNOWFLAKE_DEFAULT_PORT)), configOptions);
SnowflakeEnvironmentProperties.getSnowFlakeParameter(JDBC_PROPERTIES, configOptions),
new DatabaseConnectionInfo(SnowflakeConstants.SNOWFLAKE_DRIVER_CLASS, SnowflakeConstants.SNOWFLAKE_DEFAULT_PORT)), configOptions);
}

@VisibleForTesting
Expand All @@ -166,7 +166,6 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
{
LOGGER.debug("doGetDataSourceCapabilities: " + request);
ImmutableMap.Builder<String, List<OptimizationSubType>> capabilities = ImmutableMap.builder();

capabilities.put(DataSourceOptimizations.SUPPORTS_FILTER_PUSHDOWN.withSupportedSubTypes(
FilterPushdownSubType.SORTED_RANGE_SET, FilterPushdownSubType.NULLABLE_COMPARISON
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.sql.SQLException;

import static com.amazonaws.athena.connectors.snowflake.SnowflakeConstants.SNOWFLAKE_QUOTE_CHARACTER;
import static com.amazonaws.athena.connectors.snowflake.SnowflakeMetadataHandler.JDBC_PROPERTIES;

public class SnowflakeRecordHandler extends JdbcRecordHandler
{
Expand All @@ -59,7 +60,7 @@ public SnowflakeRecordHandler(java.util.Map<String, String> configOptions)
public SnowflakeRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, java.util.Map<String, String> configOptions)
{
this(databaseConnectionConfig, new GenericJdbcConnectionFactory(databaseConnectionConfig,
SnowflakeMetadataHandler.JDBC_PROPERTIES,
SnowflakeEnvironmentProperties.getSnowFlakeParameter(JDBC_PROPERTIES, configOptions),
new DatabaseConnectionInfo(SnowflakeConstants.SNOWFLAKE_DRIVER_CLASS,
SnowflakeConstants.SNOWFLAKE_DEFAULT_PORT)), configOptions);
}
Expand Down

0 comments on commit 7f8c53c

Please sign in to comment.