diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/AwsAuthenticationConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/AwsAuthenticationConfiguration.java index d99e494179..5e8bcd8b63 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/AwsAuthenticationConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/AwsAuthenticationConfiguration.java @@ -11,11 +11,13 @@ import java.util.Map; +import static org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig.DEFAULT_AWS_REGION; + public class AwsAuthenticationConfiguration { @JsonProperty("region") @Size(min = 1, message = "Region cannot be empty string") - private String awsRegion; + private String awsRegion = DEFAULT_AWS_REGION; @JsonProperty("sts_role_arn") @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java index d988cc3a6e..1b21f63f68 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java @@ -18,7 +18,7 @@ public class OpenSearchSinkConfig { public static final boolean DEFAULT_ESTIMATE_BULK_SIZE_USING_COMPRESSION = false; public static final int DEFAULT_MAX_LOCAL_COMPRESSIONS_FOR_ESTIMATION = 2; public static final long DEFAULT_FLUSH_TIMEOUT = 60_000L; - private static final String DEFAULT_AWS_REGION = "us-east-1"; + public static final String DEFAULT_AWS_REGION = "us-east-1"; @Getter @JsonProperty("hosts") private List hosts; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java index ffb34f9397..dcc1d2b655 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java @@ -5,8 +5,8 @@ package org.opensearch.dataprepper.plugins.sink.opensearch; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; @@ -25,12 +25,14 @@ import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.regions.Region; -import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -49,45 +51,19 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.sink.opensearch.ConnectionConfiguration.SERVERLESS; +import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DISTRIBUTION_VERSION; @ExtendWith(MockitoExtension.class) class ConnectionConfigurationTests { - private static final String OPEN_SEARCH_SINK_CONFIGURATIONS = "open-search-sink-configurations.yaml"; - private static final String EMPTY_SINK_CONFIG = "empty-sink"; - private static final String ES6_DEFAULT_CONFIG = "es6-default"; - private static final String AWS_SERVERLESS_DEFAULT = "aws-serverless-default"; - private static final String AWS_SERVERLESS_NO_CERT = "aws-serverless-no-cert"; - private static final String BASIC_CREDENTIALS_NO_CERT = "basic-credentials-no-cert"; - private static final String BASIC_CREDENTIALS_NO_CERT_INSECURE = "basic-credentials-no-cert-insecure"; - private static final String BASIC_CREDENTIALS_WITH_CERT = "basic-credentials-with-cert"; - private static final String AWS_REGION_ONLY = "aws-region-only"; - private static final String SERVERLESS_OPTIONS = "serverless-options"; - private static final String AWS_REGION_ONLY_INSECURE = "aws-region-only-insecure"; - private static final String AWS_WITH_CERT = "aws-with-cert"; - private static final String AWS_WITH_CERT_AND_ARN = "aws-with-cert-and-arn"; - private static final String AWS_WITH_2_HEADER = "aws-with-2-header"; - private static final String VALID_PROXY_IP = "valid-proxy-ip"; - private static final String VALID_PROXY_HOST_NAME = "valid-proxy-host-name"; - private static final String VALID_HTTP_PROXY_SCHEME = "valid-http-proxy-scheme"; - private static final String INVALID_HTTP_PROXY_PORT = "invalid-http-proxy-port"; - private static final String INVALID_HTTP_PROXY_NO_PORT = "invalid-http-proxy-no-port"; - private static final String INVALID_HTTP_PROXY_PORT_NOT_IN_RANGE = "invalid-http-proxy-port-not-in-range"; - private static final String INVALID_HTTP_PROXY_NOT_HTTP = "invalid-http-proxy-not-http"; - + private static final String PROXY_PARAMETER = "proxy"; private final List TEST_HOSTS = Collections.singletonList("http://localhost:9200"); - private final String TEST_USERNAME = "test-username"; - private final String TEST_PASSWORD = "test-password"; + private final String TEST_USERNAME = "admin"; + private final String TEST_PASSWORD = "admin"; private final Integer TEST_CONNECT_TIMEOUT = 5; private final Integer TEST_SOCKET_TIMEOUT = 10; + private final String TEST_CERT_PATH = Objects.requireNonNull(getClass().getClassLoader().getResource("test-ca.pem")).getFile(); private final String TEST_ROLE = "arn:aws:iam::123456789012:role/test-role"; - private final String TEST_NETWORK_POLICY = "test network policy"; - private final String TEST_COLLECTION_NAME = "test collection"; - private final String TEST_VPCE_ID = "test vpce id"; - private final String TEST_EXTERNAL_ID = "test-external-id"; - private final String TEST_HEADER_NAME_1 = "header1"; - private final String TEST_HEADER_NAME_2 = "header2"; - private final String TEST_HEADER_VALUE_1 = "test-header-1"; - private final String TEST_HEADER_VALUE_2 = "test-header-2"; @Mock private ApacheHttpClient.Builder apacheHttpClientBuilder; @@ -95,12 +71,13 @@ class ConnectionConfigurationTests { private ApacheHttpClient apacheHttpClient; @Mock private AwsCredentialsSupplier awsCredentialsSupplier; - + ObjectMapper objectMapper; @Test - void testReadConnectionConfigurationDefault() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(EMPTY_SINK_CONFIG); + void testReadConnectionConfigurationDefault() throws JsonProcessingException { + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, null, null, null, null, false, null, null, null, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertEquals(TEST_HOSTS, connectionConfiguration.getHosts()); @@ -114,16 +91,21 @@ void testReadConnectionConfigurationDefault() throws IOException { } @Test - void testReadConnectionConfigurationES6Default() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(ES6_DEFAULT_CONFIG); + void testReadConnectionConfigurationES6Default() throws JsonProcessingException { + final Map configMetadata = generateConfigurationMetadata( + TEST_HOSTS, null, null, null, null, true, null, null, null, false); + configMetadata.put(DISTRIBUTION_VERSION, "es6"); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configMetadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertFalse(connectionConfiguration.isRequestCompressionEnabled()); } @Test - void testReadConnectionConfigurationAwsOptionServerlessDefault() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(AWS_SERVERLESS_DEFAULT); + void testReadConnectionConfigurationAwsOptionServerlessDefault() throws JsonProcessingException { + final String testArn = TEST_ROLE; + final Map configMetadata = generateConfigurationMetadataWithAwsOption(TEST_HOSTS, null, null, null, null, true, false, null, testArn, null, TEST_CERT_PATH, false, Collections.emptyMap()); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configMetadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertTrue(connectionConfiguration.isServerless()); @@ -131,7 +113,8 @@ void testReadConnectionConfigurationAwsOptionServerlessDefault() throws IOExcept @Test void testCreateClientDefault() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(EMPTY_SINK_CONFIG); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, null, null, null, null, false, null, null, null, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); final RestHighLevelClient client = connectionConfiguration.createClient(awsCredentialsSupplier); @@ -141,7 +124,8 @@ void testCreateClientDefault() throws IOException { @Test void testCreateOpenSearchClientDefault() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(EMPTY_SINK_CONFIG); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, null, null, null, null, false, null, null, null, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); final RestHighLevelClient client = connectionConfiguration.createClient(awsCredentialsSupplier); @@ -155,9 +139,13 @@ void testCreateOpenSearchClientDefault() throws IOException { @Test void testCreateOpenSearchClientAwsServerlessDefault() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(AWS_SERVERLESS_NO_CERT); + final Map configMetadata = generateConfigurationMetadata( + TEST_HOSTS, null, null, null, null, true, null, null, null, false); + configMetadata.put(SERVERLESS, true); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configMetadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); + final AwsCredentialsProvider awsCredentialsProvider = mock(AwsCredentialsProvider.class); when(awsCredentialsSupplier.getProvider(any())).thenReturn(awsCredentialsProvider); @@ -179,8 +167,9 @@ void testCreateOpenSearchClientAwsServerlessDefault() throws IOException { } @Test - void testReadConnectionConfigurationWithBasicCredentialsAndNoCert() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(BASIC_CREDENTIALS_NO_CERT); + void testReadConnectionConfigurationWithDeprecatedBasicCredentialsAndNoCert() throws JsonProcessingException { + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertEquals(TEST_HOSTS, connectionConfiguration.getHosts()); @@ -191,10 +180,24 @@ void testReadConnectionConfigurationWithBasicCredentialsAndNoCert() throws IOExc assertFalse(connectionConfiguration.isAwsSigv4()); } + @Test + void testCreateClientWithDeprecatedBasicCredentialsAndNoCert() throws IOException { + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false); + final ConnectionConfiguration connectionConfiguration = + ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); + final RestHighLevelClient client = connectionConfiguration.createClient(awsCredentialsSupplier); + assertNotNull(client); + client.close(); + } @Test void testCreateClientWithBasicCredentialsAndNoCert() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(BASIC_CREDENTIALS_NO_CERT); + final Map configurationMetadata = generateConfigurationMetadata( + TEST_HOSTS, null, null, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false); + configurationMetadata.put("username", TEST_USERNAME); + configurationMetadata.put("password", TEST_PASSWORD); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configurationMetadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); final RestHighLevelClient client = connectionConfiguration.createClient(awsCredentialsSupplier); @@ -202,10 +205,10 @@ void testCreateClientWithBasicCredentialsAndNoCert() throws IOException { client.close(); } - @Test void testCreateOpenSearchClientNoCert() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(BASIC_CREDENTIALS_NO_CERT); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); final RestHighLevelClient client = connectionConfiguration.createClient(awsCredentialsSupplier); @@ -218,7 +221,8 @@ void testCreateOpenSearchClientNoCert() throws IOException { @Test void testCreateClientInsecure() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(BASIC_CREDENTIALS_NO_CERT_INSECURE); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, true); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); final RestHighLevelClient client = connectionConfiguration.createClient(awsCredentialsSupplier); @@ -228,7 +232,8 @@ void testCreateClientInsecure() throws IOException { @Test void testCreateOpenSearchClientInsecure() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(BASIC_CREDENTIALS_NO_CERT_INSECURE); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, true); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); final RestHighLevelClient client = connectionConfiguration.createClient(awsCredentialsSupplier); @@ -241,7 +246,8 @@ void testCreateOpenSearchClientInsecure() throws IOException { @Test void testCreateClientWithCertPath() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(BASIC_CREDENTIALS_WITH_CERT); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); final RestHighLevelClient client = connectionConfiguration.createClient(awsCredentialsSupplier); @@ -251,7 +257,8 @@ void testCreateClientWithCertPath() throws IOException { @Test void testCreateOpenSearchClientWithCertPath() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(BASIC_CREDENTIALS_WITH_CERT); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); final RestHighLevelClient client = connectionConfiguration.createClient(awsCredentialsSupplier); @@ -264,26 +271,58 @@ void testCreateOpenSearchClientWithCertPath() throws IOException { @Test void testCreateClientWithAWSSigV4AndRegion() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(AWS_REGION_ONLY); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, null, null, null, null, true, "us-west-2", null, null, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); - assertEquals("us-east-2", connectionConfiguration.getAwsRegion()); + assertEquals("us-west-2", connectionConfiguration.getAwsRegion()); assertTrue(connectionConfiguration.isAwsSigv4()); } @Test void testServerlessOptions() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(SERVERLESS_OPTIONS); + final String serverlessNetworkPolicyName = UUID.randomUUID().toString(); + final String serverlessCollectionName = UUID.randomUUID().toString(); + final String serverlessVpceId = UUID.randomUUID().toString(); + + final Map metadata = new HashMap<>(); + final Map awsOptionMetadata = new HashMap<>(); + final Map serverlessOptionsMetadata = new HashMap<>(); + serverlessOptionsMetadata.put("network_policy_name", serverlessNetworkPolicyName); + serverlessOptionsMetadata.put("collection_name", serverlessCollectionName); + serverlessOptionsMetadata.put("vpce_id", serverlessVpceId); + awsOptionMetadata.put("region", UUID.randomUUID().toString()); + awsOptionMetadata.put("serverless", true); + awsOptionMetadata.put("serverless_options", serverlessOptionsMetadata); + awsOptionMetadata.put("sts_role_arn", TEST_ROLE); + metadata.put("hosts", TEST_HOSTS); + metadata.put("username", UUID.randomUUID().toString()); + metadata.put("password", UUID.randomUUID().toString()); + metadata.put("connect_timeout", 1); + metadata.put("socket_timeout", 1); + metadata.put("aws", awsOptionMetadata); + + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(metadata); + final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); + assertThat(connectionConfiguration.getServerlessNetworkPolicyName(), equalTo(serverlessNetworkPolicyName)); + assertThat(connectionConfiguration.getServerlessCollectionName(), equalTo(serverlessCollectionName)); + assertThat(connectionConfiguration.getServerlessVpceId(), equalTo(serverlessVpceId)); + } + + @Test + void testCreateClientWithAWSSigV4DefaultRegion() throws IOException { + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, null, null, null, null, true, null, null, null, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); - assertThat(connectionConfiguration.getServerlessNetworkPolicyName(), equalTo(TEST_NETWORK_POLICY)); - assertThat(connectionConfiguration.getServerlessCollectionName(), equalTo(TEST_COLLECTION_NAME)); - assertThat(connectionConfiguration.getServerlessVpceId(), equalTo(TEST_VPCE_ID)); + assertEquals("us-east-1", connectionConfiguration.getAwsRegion()); + assertTrue(connectionConfiguration.isAwsSigv4()); } @Test void testCreateClientWithAWSSigV4AndInsecure() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(AWS_REGION_ONLY_INSECURE); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, null, null, null, null, true, null, null, null, true); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertEquals("us-east-1", connectionConfiguration.getAwsRegion()); @@ -292,7 +331,8 @@ void testCreateClientWithAWSSigV4AndInsecure() throws IOException { @Test void testCreateClientWithAWSSigV4AndCertPath() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(AWS_WITH_CERT); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, null, null, null, null, true, null, null, TEST_CERT_PATH, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertEquals("us-east-1", connectionConfiguration.getAwsRegion()); @@ -300,8 +340,9 @@ void testCreateClientWithAWSSigV4AndCertPath() throws IOException { } @Test - void testCreateClientWithAWSSigV4AndSTSRole() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(AWS_WITH_CERT_AND_ARN); + void testCreateClientWithAWSSigV4AndSTSRole() throws JsonProcessingException { + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, null, null, null, null, true, null, TEST_ROLE, TEST_CERT_PATH, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertThat(connectionConfiguration, notNullValue()); @@ -324,10 +365,15 @@ void testCreateClientWithAWSSigV4AndSTSRole() throws IOException { } @Test - void testCreateOpenSearchClientWithAWSSigV4AndSTSRole() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(AWS_WITH_CERT_AND_ARN); + void testCreateOpenSearchClientWithAWSSigV4AndSTSRole() throws JsonProcessingException { + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( + TEST_HOSTS, null, null, null, null, true, null, TEST_ROLE, TEST_CERT_PATH, false); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); + assertThat(connectionConfiguration, notNullValue()); + assertThat(connectionConfiguration.getAwsRegion(), equalTo("us-east-1")); + assertThat(connectionConfiguration.isAwsSigv4(), equalTo(true)); + assertThat(connectionConfiguration.getAwsStsRoleArn(), equalTo(TEST_ROLE)); final AwsCredentialsProvider awsCredentialsProvider = mock(AwsCredentialsProvider.class); when(awsCredentialsSupplier.getProvider(any())).thenReturn(awsCredentialsProvider); @@ -351,8 +397,15 @@ void testCreateOpenSearchClientWithAWSSigV4AndSTSRole() throws IOException { } @Test - void testCreateClientWithAWSOption() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(AWS_WITH_2_HEADER); + void testCreateClientWithAWSOption() throws JsonProcessingException { + final String headerName1 = UUID.randomUUID().toString(); + final String headerValue1 = UUID.randomUUID().toString(); + final String headerName2 = UUID.randomUUID().toString(); + final String headerValue2 = UUID.randomUUID().toString(); + final String testArn = TEST_ROLE; + final String externalId = UUID.randomUUID().toString(); + final Map configurationMetadata = generateConfigurationMetadataWithAwsOption(TEST_HOSTS, null, null, null, null, false, true, null, testArn, externalId, null,false, Map.of(headerName1, headerValue1, headerName2, headerValue2)); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configurationMetadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); @@ -370,18 +423,25 @@ void testCreateClientWithAWSOption() throws IOException { final AwsCredentialsOptions actualOptions = awsCredentialsOptionsArgumentCaptor.getValue(); assertThat(actualOptions.getStsRoleArn(), equalTo(TEST_ROLE)); - assertThat(actualOptions.getStsExternalId(), equalTo(TEST_EXTERNAL_ID)); + assertThat(actualOptions.getStsExternalId(), equalTo(externalId)); assertThat(actualOptions.getStsHeaderOverrides(), notNullValue()); assertThat(actualOptions.getStsHeaderOverrides().size(), equalTo(2)); - assertThat(actualOptions.getStsHeaderOverrides(), hasKey(TEST_HEADER_NAME_2)); - assertThat(actualOptions.getStsHeaderOverrides().get(TEST_HEADER_NAME_1), equalTo(TEST_HEADER_VALUE_1)); - assertThat(actualOptions.getStsHeaderOverrides(), hasKey(TEST_HEADER_NAME_2)); - assertThat(actualOptions.getStsHeaderOverrides().get(TEST_HEADER_NAME_2), equalTo(TEST_HEADER_VALUE_2)); + assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName1)); + assertThat(actualOptions.getStsHeaderOverrides().get(headerName1), equalTo(headerValue1)); + assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName2)); + assertThat(actualOptions.getStsHeaderOverrides().get(headerName2), equalTo(headerValue2)); } @Test - void testCreateOpenSearchClientWithAWSOption() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(AWS_WITH_2_HEADER); + void testCreateOpenSearchClientWithAWSOption() throws JsonProcessingException { + final String headerName1 = UUID.randomUUID().toString(); + final String headerValue1 = UUID.randomUUID().toString(); + final String headerName2 = UUID.randomUUID().toString(); + final String headerValue2 = UUID.randomUUID().toString(); + final String testArn = TEST_ROLE; + final String externalId = UUID.randomUUID().toString(); + final Map configurationMetadata = generateConfigurationMetadataWithAwsOption(TEST_HOSTS, null, null, null, null, false, true, null, testArn, externalId, TEST_CERT_PATH, false, Map.of(headerName1, headerValue1, headerName2, headerValue2)); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configurationMetadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); @@ -396,7 +456,7 @@ void testCreateOpenSearchClientWithAWSOption() throws IOException { final OpenSearchClient openSearchClient = connectionConfiguration.createOpenSearchClient(client, awsCredentialsSupplier); assertNotNull(openSearchClient); - assertThat(openSearchClient._transport(), instanceOf(AwsSdk2Transport.class)); + assertThat(openSearchClient._transport(), instanceOf(AwsSdk2Transport.class)); final AwsSdk2Transport opensearchTransport = (AwsSdk2Transport) openSearchClient._transport(); assertThat(opensearchTransport.options().credentials(), equalTo(awsCredentialsProvider)); @@ -407,15 +467,21 @@ void testCreateOpenSearchClientWithAWSOption() throws IOException { assertThat(actualOptions.getStsRoleArn(), equalTo(TEST_ROLE)); assertThat(actualOptions.getStsHeaderOverrides(), notNullValue()); assertThat(actualOptions.getStsHeaderOverrides().size(), equalTo(2)); - assertThat(actualOptions.getStsHeaderOverrides(), hasKey(TEST_HEADER_NAME_2)); - assertThat(actualOptions.getStsHeaderOverrides().get(TEST_HEADER_NAME_1), equalTo(TEST_HEADER_VALUE_1)); - assertThat(actualOptions.getStsHeaderOverrides(), hasKey(TEST_HEADER_NAME_2)); - assertThat(actualOptions.getStsHeaderOverrides().get(TEST_HEADER_NAME_2), equalTo(TEST_HEADER_VALUE_2)); + assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName1)); + assertThat(actualOptions.getStsHeaderOverrides().get(headerName1), equalTo(headerValue1)); + assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName2)); + assertThat(actualOptions.getStsHeaderOverrides().get(headerName2), equalTo(headerValue2)); } @Test - void testCreateClientWithAWSSigV4AndHeaderOverrides() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(AWS_WITH_2_HEADER); + void testCreateClientWithAWSSigV4AndHeaderOverrides() throws JsonProcessingException { + final String headerName1 = UUID.randomUUID().toString(); + final String headerValue1 = UUID.randomUUID().toString(); + final String headerName2 = UUID.randomUUID().toString(); + final String headerValue2 = UUID.randomUUID().toString(); + final Map configurationMetadata = generateConfigurationMetadata(TEST_HOSTS, null, null, null, null, true, null, TEST_ROLE, TEST_CERT_PATH, false); + configurationMetadata.put("aws_sts_header_overrides", Map.of(headerName1, headerValue1, headerName2, headerValue2)); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configurationMetadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); @@ -436,15 +502,21 @@ void testCreateClientWithAWSSigV4AndHeaderOverrides() throws IOException { assertThat(actualOptions.getRegion(), equalTo(Region.US_EAST_1)); assertThat(actualOptions.getStsHeaderOverrides(), notNullValue()); assertThat(actualOptions.getStsHeaderOverrides().size(), equalTo(2)); - assertThat(actualOptions.getStsHeaderOverrides(), hasKey(TEST_HEADER_NAME_2)); - assertThat(actualOptions.getStsHeaderOverrides().get(TEST_HEADER_NAME_1), equalTo(TEST_HEADER_VALUE_1)); - assertThat(actualOptions.getStsHeaderOverrides(), hasKey(TEST_HEADER_NAME_2)); - assertThat(actualOptions.getStsHeaderOverrides().get(TEST_HEADER_NAME_2), equalTo(TEST_HEADER_VALUE_2)); + assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName1)); + assertThat(actualOptions.getStsHeaderOverrides().get(headerName1), equalTo(headerValue1)); + assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName2)); + assertThat(actualOptions.getStsHeaderOverrides().get(headerName2), equalTo(headerValue2)); } @Test - void testCreateOpenSearchClientWithAWSSigV4AndHeaderOverrides() throws IOException { - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(AWS_WITH_2_HEADER); + void testCreateOpenSearchClientWithAWSSigV4AndHeaderOverrides() throws JsonProcessingException { + final String headerName1 = UUID.randomUUID().toString(); + final String headerValue1 = UUID.randomUUID().toString(); + final String headerName2 = UUID.randomUUID().toString(); + final String headerValue2 = UUID.randomUUID().toString(); + final Map configurationMetadata = generateConfigurationMetadata(TEST_HOSTS, null, null, null, null, true, null, TEST_ROLE, TEST_CERT_PATH, false); + configurationMetadata.put("aws_sts_header_overrides", Map.of(headerName1, headerValue1, headerName2, headerValue2)); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configurationMetadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); @@ -472,16 +544,19 @@ void testCreateOpenSearchClientWithAWSSigV4AndHeaderOverrides() throws IOExcepti assertThat(actualOptions.getRegion(), equalTo(Region.US_EAST_1)); assertThat(actualOptions.getStsHeaderOverrides(), notNullValue()); assertThat(actualOptions.getStsHeaderOverrides().size(), equalTo(2)); - assertThat(actualOptions.getStsHeaderOverrides(), hasKey(TEST_HEADER_NAME_2)); - assertThat(actualOptions.getStsHeaderOverrides().get(TEST_HEADER_NAME_1), equalTo(TEST_HEADER_VALUE_1)); - assertThat(actualOptions.getStsHeaderOverrides(), hasKey(TEST_HEADER_NAME_2)); - assertThat(actualOptions.getStsHeaderOverrides().get(TEST_HEADER_NAME_2), equalTo(TEST_HEADER_VALUE_2)); + assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName1)); + assertThat(actualOptions.getStsHeaderOverrides().get(headerName1), equalTo(headerValue1)); + assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName2)); + assertThat(actualOptions.getStsHeaderOverrides().get(headerName2), equalTo(headerValue2)); } @Test void testCreateAllClients_WithValidHttpProxy_HostIP() throws IOException { + final Map metadata = generateConfigurationMetadata( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false); final String testHttpProxy = "121.121.121.121:80"; - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(VALID_PROXY_IP); + metadata.put(PROXY_PARAMETER, testHttpProxy); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(metadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertEquals(connectionConfiguration.getProxy().get(), testHttpProxy); @@ -493,8 +568,11 @@ void testCreateAllClients_WithValidHttpProxy_HostIP() throws IOException { @Test void testCreateAllClients_WithValidHttpProxy_HostName() throws IOException { + final Map metadata = generateConfigurationMetadata( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false); final String testHttpProxy = "example.com:80"; - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(VALID_PROXY_HOST_NAME); + metadata.put(PROXY_PARAMETER, testHttpProxy); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(metadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertEquals(connectionConfiguration.getProxy().get(), testHttpProxy); @@ -506,8 +584,11 @@ void testCreateAllClients_WithValidHttpProxy_HostName() throws IOException { @Test void testCreateAllClients_WithValidHttpProxy_SchemeProvided() throws IOException { + final Map metadata = generateConfigurationMetadata( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false); final String testHttpProxy = "http://example.com:4350"; - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(VALID_HTTP_PROXY_SCHEME); + metadata.put(PROXY_PARAMETER, testHttpProxy); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(metadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertEquals(connectionConfiguration.getProxy().get(), testHttpProxy); @@ -518,9 +599,12 @@ void testCreateAllClients_WithValidHttpProxy_SchemeProvided() throws IOException } @Test - void testCreateClient_WithInvalidHttpProxy_InvalidPort() throws IOException { + void testCreateClient_WithInvalidHttpProxy_InvalidPort() throws JsonProcessingException { + final Map metadata = generateConfigurationMetadata( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false); final String testHttpProxy = "example.com:port"; - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(INVALID_HTTP_PROXY_PORT); + metadata.put(PROXY_PARAMETER, testHttpProxy); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(metadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertEquals(connectionConfiguration.getProxy().get(), testHttpProxy); @@ -528,27 +612,36 @@ void testCreateClient_WithInvalidHttpProxy_InvalidPort() throws IOException { } @Test - void testCreateClient_WithInvalidHttpProxy_NoPort() throws IOException { + void testCreateClient_WithInvalidHttpProxy_NoPort() throws JsonProcessingException { + final Map metadata = generateConfigurationMetadata( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false); final String testHttpProxy = "example.com"; - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(INVALID_HTTP_PROXY_NO_PORT); + metadata.put(PROXY_PARAMETER, testHttpProxy); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(metadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertThrows(IllegalArgumentException.class, () -> connectionConfiguration.createClient(awsCredentialsSupplier)); } @Test - void testCreateClient_WithInvalidHttpProxy_PortNotInRange() throws IOException { + void testCreateClient_WithInvalidHttpProxy_PortNotInRange() throws JsonProcessingException { + final Map metadata = generateConfigurationMetadata( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false); final String testHttpProxy = "example.com:888888"; - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(INVALID_HTTP_PROXY_PORT_NOT_IN_RANGE); + metadata.put(PROXY_PARAMETER, testHttpProxy); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(metadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertThrows(IllegalArgumentException.class, () -> connectionConfiguration.createClient(awsCredentialsSupplier)); } @Test - void testCreateClient_WithInvalidHttpProxy_NotHttp() throws IOException { + void testCreateClient_WithInvalidHttpProxy_NotHttp() throws JsonProcessingException { + final Map metadata = generateConfigurationMetadata( + TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false); final String testHttpProxy = "socket://example.com:port"; - final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(INVALID_HTTP_PROXY_NOT_HTTP); + metadata.put(PROXY_PARAMETER, testHttpProxy); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(metadata); final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); assertEquals(connectionConfiguration.getProxy().get(), testHttpProxy); @@ -564,15 +657,64 @@ void testCreateClient_WithConnectionConfigurationBuilder_ProxyOptionalObjectShou assertNotNull(client); client.close(); } - - private OpenSearchSinkConfig generateOpenSearchSinkConfig(String pipelineName) throws IOException { - final File configurationFile = new File(getClass().getClassLoader().getResource(OPEN_SEARCH_SINK_CONFIGURATIONS).getFile()); - objectMapper = new ObjectMapper(new YAMLFactory()); - final Map pipelineConfigs = objectMapper.readValue(configurationFile, Map.class); - final Map pipelineConfig = (Map) pipelineConfigs.get(pipelineName); - final Map sinkMap = (Map) pipelineConfig.get("sink"); - final Map opensearchSinkMap = (Map) sinkMap.get("opensearch"); - String json = objectMapper.writeValueAsString(opensearchSinkMap); + + private OpenSearchSinkConfig generateOpenSearchSinkConfig( + final List hosts, final String username, final String password, + final Integer connectTimeout, final Integer socketTimeout, final boolean awsSigv4, final String awsRegion, + final String awsStsRoleArn, final String certPath, final boolean insecure) throws JsonProcessingException { + + final Map metadata = generateConfigurationMetadata(hosts, username, password, connectTimeout, socketTimeout, awsSigv4, awsRegion, awsStsRoleArn, certPath, insecure); + return getOpenSearchSinkConfigByConfigMetadata(metadata); + } + + private Map generateConfigurationMetadata( + final List hosts, final String username, final String password, + final Integer connectTimeout, final Integer socketTimeout, final boolean awsSigv4, final String awsRegion, + final String awsStsRoleArn, final String certPath, final boolean insecure) { + final Map metadata = new HashMap<>(); + metadata.put("hosts", hosts); + metadata.put("username", username); + metadata.put("password", password); + metadata.put("connect_timeout", connectTimeout); + metadata.put("socket_timeout", socketTimeout); + metadata.put("aws_sigv4", awsSigv4); + if (awsRegion != null) { + metadata.put("aws_region", awsRegion); + } + metadata.put("aws_sts_role_arn", awsStsRoleArn); + metadata.put("cert", certPath); + metadata.put("insecure", insecure); + return metadata; + } + + + private Map generateConfigurationMetadataWithAwsOption( + final List hosts, final String username, final String password, + final Integer connectTimeout, final Integer socketTimeout, final boolean serverless, final boolean awsSigv4, final String awsRegion, + final String awsStsRoleArn, final String awsStsExternalId, final String certPath, final boolean insecure, Map headerOverridesMap) { + final Map metadata = new HashMap<>(); + final Map awsOptionMetadata = new HashMap<>(); + metadata.put("hosts", hosts); + metadata.put("username", username); + metadata.put("password", password); + metadata.put("connect_timeout", connectTimeout); + metadata.put("socket_timeout", socketTimeout); + if (awsRegion != null) { + awsOptionMetadata.put("region", awsRegion); + } + awsOptionMetadata.put("serverless", serverless); + awsOptionMetadata.put("sts_role_arn", awsStsRoleArn); + awsOptionMetadata.put("sts_external_id", awsStsExternalId); + awsOptionMetadata.put("sts_header_overrides", headerOverridesMap); + metadata.put("aws", awsOptionMetadata); + metadata.put("cert", certPath); + metadata.put("insecure", insecure); + return metadata; + } + + private OpenSearchSinkConfig getOpenSearchSinkConfigByConfigMetadata(final Map metadata) throws JsonProcessingException { + objectMapper = new ObjectMapper(); + String json = new ObjectMapper().writeValueAsString(metadata); OpenSearchSinkConfig openSearchSinkConfig = objectMapper.readValue(json, OpenSearchSinkConfig.class); return openSearchSinkConfig;