diff --git a/data-prepper-plugins/otel-trace-group-processor/build.gradle b/data-prepper-plugins/otel-trace-group-processor/build.gradle
index 1360a670fd..943a083910 100644
--- a/data-prepper-plugins/otel-trace-group-processor/build.gradle
+++ b/data-prepper-plugins/otel-trace-group-processor/build.gradle
@@ -11,5 +11,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'io.micrometer:micrometer-core'
+ implementation 'software.amazon.awssdk:arns'
+ implementation 'software.amazon.awssdk:auth'
testImplementation project(':data-prepper-api').sourceSets.test.output
}
\ No newline at end of file
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/AuthConfig.java b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/AuthConfig.java
new file mode 100644
index 0000000000..cb5dc11fc8
--- /dev/null
+++ b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/AuthConfig.java
@@ -0,0 +1,21 @@
+package org.opensearch.dataprepper.plugins.processor.oteltracegroup;
+
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+
+public class AuthConfig {
+ @JsonPropertyDescription("A string that contains the username and is used in the " +
+ "internal users YAML configuration file of your OpenSearch cluster.")
+ private String username;
+
+ @JsonPropertyDescription("A string that contains the password and is used in the " +
+ "internal users YAML configuration file of your OpenSearch cluster.")
+ private String password;
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+}
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/AwsOption.java b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/AwsOption.java
new file mode 100644
index 0000000000..613b39540b
--- /dev/null
+++ b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/AwsOption.java
@@ -0,0 +1,98 @@
+package org.opensearch.dataprepper.plugins.processor.oteltracegroup;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import jakarta.validation.constraints.AssertTrue;
+import jakarta.validation.constraints.Size;
+import software.amazon.awssdk.arns.Arn;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+public class AwsOption {
+ public static final String REGION = "region";
+ public static final String STS_ROLE_ARN = "sts_role_arn";
+ public static final String STS_EXTERNAL_ID = "sts_external_id";
+ public static final String STS_HEADER_OVERRIDES = "sts_header_overrides";
+ public static final String SERVERLESS = "serverless";
+ public static final String SERVERLESS_OPTIONS = "serverless_options";
+ private static final String AWS_IAM = "iam";
+ private static final String AWS_IAM_ROLE = "role";
+
+ static final String DEFAULT_AWS_REGION = "us-east-1";
+
+ @JsonProperty(SERVERLESS)
+ private boolean serverless;
+
+ @JsonProperty(REGION)
+ @JsonPropertyDescription("A string that represents the AWS Region of the Amazon OpenSearch Service domain, " +
+ "for example, us-west-2
. Only applies to Amazon OpenSearch Service.")
+ private String region = DEFAULT_AWS_REGION;
+
+ @JsonProperty(STS_ROLE_ARN)
+ @JsonPropertyDescription("An AWS Identity and Access Management (IAM) role that the sink plugin assumes to sign the request to Amazon OpenSearch Service. " +
+ "If not provided, the plugin uses the default credentials.")
+ @Size(max = 2048, message = "sts_role_arn length cannot exceed 2048.")
+ private String stsRoleArn;
+
+ @JsonProperty(STS_EXTERNAL_ID)
+ @JsonPropertyDescription("The external ID to attach to AssumeRole requests from AWS STS.")
+ private String stsExternalId;
+
+ @JsonProperty(STS_HEADER_OVERRIDES)
+ @JsonPropertyDescription("A map of header overrides that the IAM role assumes for the plugin.")
+ private Map stsHeaderOverrides = Collections.emptyMap();
+
+ @JsonProperty(SERVERLESS_OPTIONS)
+ @JsonPropertyDescription("The network configuration options available when the backend of the opensearch
sink is set to Amazon OpenSearch Serverless.")
+ private ServerlessOptions serverlessOptions;
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getStsRoleArn() {
+ return stsRoleArn;
+ }
+
+ public String getStsExternalId() {
+ return stsExternalId;
+ }
+
+ public Map getStsHeaderOverrides() {
+ return stsHeaderOverrides;
+ }
+
+ public boolean isServerless() {
+ return serverless;
+ }
+
+ public ServerlessOptions getServerlessOptions() {
+ return serverlessOptions;
+ }
+
+ @AssertTrue(message = "sts_role_arn must be an null or a valid IAM role ARN.")
+ boolean isValidStsRoleArn() {
+ if (stsRoleArn == null) {
+ return true;
+ }
+ final Arn arn = getArn(stsRoleArn);
+ if (!AWS_IAM.equals(arn.service())) {
+ return false;
+ }
+ final Optional resourceType = arn.resource().resourceType();
+ if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) {
+ return false;
+ }
+ return true;
+ }
+
+ private Arn getArn(final String awsStsRoleArn) {
+ try {
+ return Arn.fromString(awsStsRoleArn);
+ } catch (final Exception e) {
+ throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn));
+ }
+ }
+}
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/ConnectionConfiguration.java b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/ConnectionConfiguration.java
new file mode 100644
index 0000000000..7381704ec9
--- /dev/null
+++ b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/ConnectionConfiguration.java
@@ -0,0 +1,223 @@
+package org.opensearch.dataprepper.plugins.processor.oteltracegroup;
+
+import com.fasterxml.jackson.annotation.JsonClassDescription;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import jakarta.validation.constraints.AssertTrue;
+import jakarta.validation.constraints.NotEmpty;
+import jakarta.validation.constraints.Size;
+import software.amazon.awssdk.arns.Arn;
+
+import java.nio.file.Path;
+import java.time.temporal.ValueRange;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@JsonPropertyOrder
+@JsonClassDescription("The otel_trace_group
processor completes missing trace-group-related fields in the " +
+ "collection of span " +
+ "records by looking up the OpenSearch backend. The otel_trace_group processor identifies the missing trace group information for a spanId by looking up the relevant fields in its root span
stored in OpenSearch.")
+public class ConnectionConfiguration {
+ private static final String AWS_IAM_ROLE = "role";
+ private static final String AWS_IAM = "iam";
+ private static final String DEFAULT_AWS_REGION = "us-east-1";
+ static final String AOS_SERVICE_NAME = "es";
+
+ public static final String HOSTS = "hosts";
+ public static final String USERNAME = "username";
+ public static final String PASSWORD = "password";
+ public static final String SOCKET_TIMEOUT = "socket_timeout";
+ public static final String CONNECT_TIMEOUT = "connect_timeout";
+ public static final String CERT_PATH = "cert";
+ public static final String INSECURE = "insecure";
+ public static final String AUTHENTICATION = "authentication";
+ public static final String AWS_OPTION = "aws";
+ public static final String AWS_SIGV4 = "aws_sigv4";
+ public static final String AWS_REGION = "aws_region";
+ public static final String AWS_STS_ROLE_ARN = "aws_sts_role_arn";
+ public static final String AWS_STS_EXTERNAL_ID = "aws_sts_external_id";
+ public static final String AWS_STS_HEADER_OVERRIDES = "aws_sts_header_overrides";
+ public static final String PROXY = "proxy";
+
+ /**
+ * The valid port range per https://tools.ietf.org/html/rfc6335.
+ */
+ static final ValueRange VALID_PORT_RANGE = ValueRange.of(0, 65535);
+
+ @NotEmpty
+ @JsonProperty(HOSTS)
+ @JsonPropertyDescription("A list of IP addresses of OpenSearch nodes.")
+ private List hosts;
+
+ @JsonProperty(USERNAME)
+ @JsonPropertyDescription("A string that contains the username and is used in the " +
+ "internal users YAML configuration file of your OpenSearch cluster.")
+ private String username;
+
+ @JsonProperty(PASSWORD)
+ @JsonPropertyDescription("A string that contains the password and is used in the " +
+ "internal users YAML configuration file of your OpenSearch cluster.")
+ private String password;
+
+ @JsonProperty(CERT_PATH)
+ @JsonPropertyDescription("A certificate authority (CA) certificate that is PEM encoded. Accepts both .pem or .crt. " +
+ "This enables the client to trust the CA that has signed the certificate that OpenSearch is using.")
+ private Path certPath;
+
+ @JsonProperty(SOCKET_TIMEOUT)
+ private Integer socketTimeout;
+
+ @JsonProperty(CONNECT_TIMEOUT)
+ private Integer connectTimeout;
+
+ @JsonProperty(INSECURE)
+ private boolean insecure;
+
+ @JsonProperty(AWS_SIGV4)
+ @JsonPropertyDescription("A Boolean flag used to sign the HTTP request with AWS credentials. " +
+ "Only applies to Amazon OpenSearch Service. See OpenSearch security for details.")
+ private boolean awsSigv4;
+
+ @JsonProperty(AWS_REGION)
+ @JsonPropertyDescription("A string that represents the AWS Region of the Amazon OpenSearch Service domain, " +
+ "for example, us-west-2
. Only applies to Amazon OpenSearch Service.")
+ private String awsRegion = DEFAULT_AWS_REGION;
+
+ @JsonProperty(AWS_STS_ROLE_ARN)
+ @JsonPropertyDescription("An AWS Identity and Access Management (IAM) role that the sink plugin assumes to sign the request to Amazon OpenSearch Service. " +
+ "If not provided, the plugin uses the default credentials.")
+ @Size(max = 2048, message = "aws_sts_role_arn length cannot exceed 2048")
+ private String awsStsRoleArn;
+
+ @JsonProperty(AWS_OPTION)
+ @JsonPropertyDescription("The AWS configuration.")
+ private AwsOption awsOption;
+
+ @JsonProperty(AWS_STS_EXTERNAL_ID)
+ @Size(max = 1224, message = "aws_sts_external_id length cannot exceed 1224")
+ @JsonPropertyDescription("The external ID to attach to AssumeRole requests from AWS STS.")
+ private String awsStsExternalId;
+
+ @JsonProperty(AWS_STS_HEADER_OVERRIDES)
+ @JsonPropertyDescription("A map of header overrides that the IAM role assumes for the plugin.")
+ private Map awsStsHeaderOverrides;
+
+ @JsonProperty(PROXY)
+ @JsonPropertyDescription("The address of the forward HTTP proxy server. " +
+ "The format is \"<hostname or IP>:<port>\" (for example, \"example.com:8100\", \"http://example.com:8100\", \"112.112.112.112:8100\"). " +
+ "The port number cannot be omitted.")
+ private String proxy;
+
+ @JsonProperty(AUTHENTICATION)
+ @JsonPropertyDescription("The basic authentication configuration.")
+ private AuthConfig authConfig;
+
+ List getHosts() {
+ return hosts;
+ }
+
+ String getUsername() {
+ return username;
+ }
+
+ String getPassword() {
+ return password;
+ }
+
+ boolean isAwsSigv4() {
+ return awsSigv4 || awsOption != null;
+ }
+
+ String getAwsRegion() {
+ return awsRegion;
+ }
+
+ String getAwsStsRoleArn() {
+ return awsStsRoleArn;
+ }
+
+ AwsOption getAwsOption() {
+ return awsOption;
+ }
+
+ Path getCertPath() {
+ return certPath;
+ }
+
+ String getProxy() {
+ return proxy;
+ }
+
+ Integer getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ Integer getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public AuthConfig getAuthConfig() {
+ return authConfig;
+ }
+
+ public String getAwsStsExternalId() {
+ return awsStsExternalId;
+ }
+
+ public Map getAwsStsHeaderOverrides() {
+ return awsStsHeaderOverrides;
+ }
+
+ public boolean isInsecure() {
+ return insecure;
+ }
+
+ @AssertTrue(message = "Deprecated username and password should not be set " +
+ "when authentication is configured.")
+ boolean isValidAuthentication() {
+ if (authConfig != null) {
+ return username == null && password == null;
+ }
+ if (username != null || password != null) {
+ return authConfig == null;
+ }
+ return true;
+ }
+
+ @AssertTrue(message = "aws_sigv4 option cannot be used along with aws option.")
+ boolean isValidAwsAuth() {
+ if (awsOption != null) {
+ return !awsSigv4;
+ }
+ if (awsSigv4) {
+ return awsOption == null;
+ }
+ return true;
+ }
+
+ @AssertTrue(message = "sts_role_arn must be an null or a valid IAM role ARN.")
+ boolean isValidStsRoleArn() {
+ if (awsStsRoleArn == null) {
+ return true;
+ }
+ final Arn arn = getArn(awsStsRoleArn);
+ if (!AWS_IAM.equals(arn.service())) {
+ return false;
+ }
+ final Optional resourceType = arn.resource().resourceType();
+ if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) {
+ return false;
+ }
+ return true;
+ }
+
+ private Arn getArn(final String awsStsRoleArn) {
+ try {
+ return Arn.fromString(awsStsRoleArn);
+ } catch (final Exception e) {
+ throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn));
+ }
+ }
+}
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessor.java b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessor.java
index cb7b875cbc..10a79b068d 100644
--- a/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessor.java
+++ b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessor.java
@@ -6,9 +6,9 @@
package org.opensearch.dataprepper.plugins.processor.oteltracegroup;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
+import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
-import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
@@ -46,7 +46,8 @@
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
-@DataPrepperPlugin(name = "otel_trace_group", pluginType = Processor.class)
+@DataPrepperPlugin(name = "otel_trace_group", pluginType = Processor.class,
+ pluginConfigurationType = OTelTraceGroupProcessorConfig.class)
public class OTelTraceGroupProcessor extends AbstractProcessor, Record> {
public static final String RECORDS_IN_MISSING_TRACE_GROUP = "recordsInMissingTraceGroup";
@@ -63,10 +64,14 @@ public class OTelTraceGroupProcessor extends AbstractProcessor, Rec
private final Counter recordsOutMissingTraceGroupCounter;
@DataPrepperPluginConstructor
- public OTelTraceGroupProcessor(final PluginSetting pluginSetting, final AwsCredentialsSupplier awsCredentialsSupplier) {
- super(pluginSetting);
- otelTraceGroupProcessorConfig = OTelTraceGroupProcessorConfig.buildConfig(pluginSetting);
- restHighLevelClient = otelTraceGroupProcessorConfig.getEsConnectionConfig().createClient(awsCredentialsSupplier);
+ public OTelTraceGroupProcessor(final PluginMetrics pluginMetrics,
+ final OTelTraceGroupProcessorConfig otelTraceGroupProcessorConfig,
+ final AwsCredentialsSupplier awsCredentialsSupplier) {
+ super(pluginMetrics);
+ this.otelTraceGroupProcessorConfig = otelTraceGroupProcessorConfig;
+ final OpenSearchClientFactory openSearchClientFactory = OpenSearchClientFactory.fromConnectionConfiguration(
+ otelTraceGroupProcessorConfig.getEsConnectionConfig());
+ restHighLevelClient = openSearchClientFactory.createRestHighLevelClient(awsCredentialsSupplier);
recordsInMissingTraceGroupCounter = pluginMetrics.counter(RECORDS_IN_MISSING_TRACE_GROUP);
recordsOutFixedTraceGroupCounter = pluginMetrics.counter(RECORDS_OUT_FIXED_TRACE_GROUP);
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorConfig.java b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorConfig.java
index cf2aa24a25..45346f98df 100644
--- a/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorConfig.java
+++ b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorConfig.java
@@ -1,12 +1,6 @@
-/*
- * Copyright OpenSearch Contributors
- * SPDX-License-Identifier: Apache-2.0
- */
-
package org.opensearch.dataprepper.plugins.processor.oteltracegroup;
-import org.opensearch.dataprepper.model.configuration.PluginSetting;
-import org.opensearch.dataprepper.plugins.sink.opensearch.ConnectionConfiguration;
+import com.fasterxml.jackson.annotation.JsonUnwrapped;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
@@ -17,18 +11,10 @@ public class OTelTraceGroupProcessorConfig {
protected static final String RAW_INDEX_ALIAS = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
protected static final String STRICT_DATE_TIME = "strict_date_time";
- private final ConnectionConfiguration esConnectionConfig;
+ @JsonUnwrapped
+ private ConnectionConfiguration esConnectionConfig;
public ConnectionConfiguration getEsConnectionConfig() {
return esConnectionConfig;
}
-
- private OTelTraceGroupProcessorConfig(final ConnectionConfiguration esConnectionConfig) {
- this.esConnectionConfig = esConnectionConfig;
- }
-
- public static OTelTraceGroupProcessorConfig buildConfig(final PluginSetting pluginSetting) {
- final ConnectionConfiguration esConnectionConfig = ConnectionConfiguration.readConnectionConfiguration(pluginSetting);
- return new OTelTraceGroupProcessorConfig(esConnectionConfig);
- }
}
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OpenSearchClientFactory.java b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OpenSearchClientFactory.java
new file mode 100644
index 0000000000..ba990ac5eb
--- /dev/null
+++ b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OpenSearchClientFactory.java
@@ -0,0 +1,204 @@
+package org.opensearch.dataprepper.plugins.processor.oteltracegroup;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.conn.ssl.TrustStrategy;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
+import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
+import org.opensearch.dataprepper.aws.api.AwsRequestSigningApache4Interceptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.signer.Aws4Signer;
+
+import javax.net.ssl.SSLContext;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.opensearch.dataprepper.plugins.processor.oteltracegroup.ConnectionConfiguration.AOS_SERVICE_NAME;
+import static org.opensearch.dataprepper.plugins.processor.oteltracegroup.ConnectionConfiguration.AWS_SIGV4;
+import static org.opensearch.dataprepper.plugins.processor.oteltracegroup.ConnectionConfiguration.VALID_PORT_RANGE;
+
+public class OpenSearchClientFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(OpenSearchClientFactory.class);
+
+ private final ConnectionConfiguration connectionConfiguration;
+
+ public static OpenSearchClientFactory fromConnectionConfiguration(
+ final ConnectionConfiguration connectionConfiguration) {
+ return new OpenSearchClientFactory(connectionConfiguration);
+ }
+
+ OpenSearchClientFactory(final ConnectionConfiguration connectionConfiguration) {
+ this.connectionConfiguration = connectionConfiguration;
+ }
+
+ public RestHighLevelClient createRestHighLevelClient(final AwsCredentialsSupplier awsCredentialsSupplier) {
+ final HttpHost[] httpHosts = new HttpHost[connectionConfiguration.getHosts().size()];
+ int i = 0;
+ for (final String host : connectionConfiguration.getHosts()) {
+ httpHosts[i] = HttpHost.create(host);
+ i++;
+ }
+ final RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
+ /*
+ * Given that this is a patch release, we will support only the IAM based access policy AES domains.
+ * We will not support FGAC and Custom endpoint domains. This will be followed in the next version.
+ */
+ if (connectionConfiguration.isAwsSigv4()) {
+ attachSigV4(restClientBuilder, awsCredentialsSupplier);
+ } else {
+ attachUserCredentials(restClientBuilder);
+ }
+ restClientBuilder.setRequestConfigCallback(
+ requestConfigBuilder -> {
+ if (connectionConfiguration.getConnectTimeout() != null) {
+ requestConfigBuilder.setConnectTimeout(connectionConfiguration.getConnectTimeout());
+ }
+ if (connectionConfiguration.getSocketTimeout() != null) {
+ requestConfigBuilder.setSocketTimeout(connectionConfiguration.getSocketTimeout());
+ }
+ return requestConfigBuilder;
+ });
+ return new RestHighLevelClient(restClientBuilder);
+ }
+
+ private void attachSigV4(final RestClientBuilder restClientBuilder, AwsCredentialsSupplier awsCredentialsSupplier) {
+ final String awsRegion = connectionConfiguration.getAwsOption() == null ?
+ connectionConfiguration.getAwsRegion() : connectionConfiguration.getAwsOption().getRegion();
+ //if aws signing is enabled we will add AWSRequestSigningApacheInterceptor interceptor,
+ //if not follow regular credentials process
+ LOG.info("{} is set, will sign requests using AWSRequestSigningApacheInterceptor", AWS_SIGV4);
+ final Aws4Signer aws4Signer = Aws4Signer.create();
+ final AwsCredentialsOptions awsCredentialsOptions = createAwsCredentialsOptions();
+ final AwsCredentialsProvider credentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);
+ final HttpRequestInterceptor httpRequestInterceptor = new AwsRequestSigningApache4Interceptor(AOS_SERVICE_NAME, aws4Signer,
+ credentialsProvider, awsRegion);
+ restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
+ httpClientBuilder.addInterceptorLast(httpRequestInterceptor);
+ attachSSLContext(httpClientBuilder);
+ setHttpProxyIfApplicable(httpClientBuilder);
+ return httpClientBuilder;
+ });
+ }
+
+ public AwsCredentialsOptions createAwsCredentialsOptions() {
+ final String awsStsRoleArn = connectionConfiguration.getAwsOption() == null ?
+ connectionConfiguration.getAwsStsRoleArn() : connectionConfiguration.getAwsOption().getStsRoleArn();
+ final String awsStsExternalId = connectionConfiguration.getAwsOption() == null ?
+ connectionConfiguration.getAwsStsExternalId() :
+ connectionConfiguration.getAwsOption().getStsExternalId();
+ final String awsRegion = connectionConfiguration.getAwsOption() == null ?
+ connectionConfiguration.getAwsRegion() : connectionConfiguration.getAwsOption().getRegion();
+ final Map awsStsHeaderOverrides = connectionConfiguration.getAwsOption() == null ?
+ connectionConfiguration.getAwsStsHeaderOverrides() :
+ connectionConfiguration.getAwsOption().getStsHeaderOverrides();
+ final AwsCredentialsOptions awsCredentialsOptions = AwsCredentialsOptions.builder()
+ .withStsRoleArn(awsStsRoleArn)
+ .withStsExternalId(awsStsExternalId)
+ .withRegion(awsRegion)
+ .withStsHeaderOverrides(awsStsHeaderOverrides)
+ .build();
+ return awsCredentialsOptions;
+ }
+
+ private void attachUserCredentials(final RestClientBuilder restClientBuilder) {
+ final AuthConfig authConfig = connectionConfiguration.getAuthConfig();
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ if (authConfig != null) {
+ if (authConfig.getUsername() != null) {
+ LOG.info("Using the authentication provided in the config.");
+ credentialsProvider.setCredentials(
+ AuthScope.ANY, new UsernamePasswordCredentials(authConfig.getUsername(), authConfig.getPassword()));
+ }
+ } else {
+ final String username = connectionConfiguration.getUsername();
+ final String password = connectionConfiguration.getPassword();
+ if (username != null) {
+ LOG.info("Using the username provided in the config.");
+ credentialsProvider.setCredentials(
+ AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+ }
+ }
+ restClientBuilder.setHttpClientConfigCallback(
+ httpClientBuilder -> {
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ attachSSLContext(httpClientBuilder);
+ setHttpProxyIfApplicable(httpClientBuilder);
+ return httpClientBuilder;
+ }
+ );
+ }
+
+ private void setHttpProxyIfApplicable(final HttpAsyncClientBuilder httpClientBuilder) {
+ Optional.ofNullable(connectionConfiguration.getProxy()).ifPresent(
+ p -> {
+ final HttpHost httpProxyHost = HttpHost.create(p);
+ checkProxyPort(httpProxyHost.getPort());
+ httpClientBuilder.setProxy(httpProxyHost);
+ }
+ );
+ }
+
+ private void checkProxyPort(final int port) {
+ if (!VALID_PORT_RANGE.isValidIntValue(port)) {
+ throw new IllegalArgumentException("Invalid or missing proxy port.");
+ }
+ }
+
+ private void attachSSLContext(final HttpAsyncClientBuilder httpClientBuilder) {
+ final Path certPath = connectionConfiguration.getCertPath();
+ final SSLContext sslContext = certPath != null ? getCAStrategy(certPath) : getTrustAllStrategy();
+ httpClientBuilder.setSSLContext(sslContext);
+ if (connectionConfiguration.isInsecure()) {
+ httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
+ }
+ }
+
+ private SSLContext getCAStrategy(Path certPath) {
+ LOG.info("Using the cert provided in the config.");
+ try {
+ CertificateFactory factory = CertificateFactory.getInstance("X.509");
+ Certificate trustedCa;
+ try (InputStream is = Files.newInputStream(certPath)) {
+ trustedCa = factory.generateCertificate(is);
+ }
+ KeyStore trustStore = KeyStore.getInstance("pkcs12");
+ trustStore.load(null, null);
+ trustStore.setCertificateEntry("ca", trustedCa);
+ SSLContextBuilder sslContextBuilder = SSLContexts.custom()
+ .loadTrustMaterial(trustStore, null);
+ return sslContextBuilder.build();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex.getMessage(), ex);
+ }
+ }
+
+ private SSLContext getTrustAllStrategy() {
+ LOG.info("Using the trust all strategy");
+ final TrustStrategy trustStrategy = new TrustAllStrategy();
+ try {
+ return SSLContexts.custom().loadTrustMaterial(null, trustStrategy).build();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex.getMessage(), ex);
+ }
+ }
+}
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/ServerlessOptions.java b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/ServerlessOptions.java
new file mode 100644
index 0000000000..1b3858d019
--- /dev/null
+++ b/data-prepper-plugins/otel-trace-group-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/ServerlessOptions.java
@@ -0,0 +1,45 @@
+package org.opensearch.dataprepper.plugins.processor.oteltracegroup;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+
+public class ServerlessOptions {
+ public static final String COLLECTION_NAME = "collection_name";
+ public static final String NETWORK_POLICY_NAME = "network_policy_name";
+ public static final String VPCE_ID = "vpce_id";
+
+ @JsonProperty(NETWORK_POLICY_NAME)
+ @JsonPropertyDescription("The name of the network policy to create.")
+ private String networkPolicyName;
+
+ @JsonProperty(COLLECTION_NAME)
+ @JsonPropertyDescription("The name of the Amazon OpenSearch Serverless collection to configure.")
+ private String collectionName;
+
+ @JsonProperty(VPCE_ID)
+ @JsonPropertyDescription("The virtual private cloud (VPC) endpoint to which the source connects.")
+ private String vpceId;
+
+ public ServerlessOptions() {
+
+ }
+
+ public ServerlessOptions(String networkPolicyName, String collectionName, String vpceId) {
+ this.networkPolicyName = networkPolicyName;
+ this.collectionName = collectionName;
+ this.vpceId = vpceId;
+ }
+
+ public String getNetworkPolicyName() {
+ return networkPolicyName;
+ }
+
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ public String getVpceId() {
+ return vpceId;
+ }
+
+}
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/ConnectionConfigurationTest.java b/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/ConnectionConfigurationTest.java
new file mode 100644
index 0000000000..6d37bf5519
--- /dev/null
+++ b/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/ConnectionConfigurationTest.java
@@ -0,0 +1,196 @@
+package org.opensearch.dataprepper.plugins.processor.oteltracegroup;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ConnectionConfigurationTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final List TEST_HOSTS = Collections.singletonList("http://localhost:9200");
+ private final String TEST_USERNAME = "admin";
+ private final String TEST_PASSWORD = "admin";
+ private final String TEST_PIPELINE_NAME = "Test-Pipeline";
+ private final Integer TEST_CONNECT_TIMEOUT = 5;
+ private final Integer TEST_SOCKET_TIMEOUT = 10;
+ private final String TEST_CERT_PATH = Objects.requireNonNull(getClass().getClassLoader().getResource("test-ca.pem")).getFile();
+ private final String TEST_ROLE = "arn:aws:iam::123456789012:role/test-role";
+
+ @Test
+ void testDeserializeConnectionConfigurationDefault() {
+ final Map pluginSetting = generateConfigurationMetadata(
+ TEST_HOSTS, null, null, null, null, false, null, null, null, false);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ pluginSetting, ConnectionConfiguration.class);
+ assertEquals(TEST_HOSTS, connectionConfiguration.getHosts());
+ assertNull(connectionConfiguration.getUsername());
+ assertNull(connectionConfiguration.getPassword());
+ assertFalse(connectionConfiguration.isAwsSigv4());
+ assertNull(connectionConfiguration.getCertPath());
+ assertNull(connectionConfiguration.getConnectTimeout());
+ assertNull(connectionConfiguration.getSocketTimeout());
+ }
+
+ @Test
+ void testDeserializeConnectionConfigurationWithAwsSigV4() {
+ final String stsRoleArn = "arn:aws:iam::123456789012:role/TestRole";
+ final Map pluginSetting = generateConfigurationMetadata(
+ TEST_HOSTS, null, null, null, null, true, null, stsRoleArn, null, false);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ pluginSetting, ConnectionConfiguration.class);
+ assertEquals(TEST_HOSTS, connectionConfiguration.getHosts());
+ assertNull(connectionConfiguration.getUsername());
+ assertNull(connectionConfiguration.getPassword());
+ assertTrue(connectionConfiguration.isAwsSigv4());
+ assertNull(connectionConfiguration.getCertPath());
+ assertNull(connectionConfiguration.getConnectTimeout());
+ assertNull(connectionConfiguration.getSocketTimeout());
+ assertTrue(connectionConfiguration.isValidAwsAuth());
+ assertTrue(connectionConfiguration.isValidStsRoleArn());
+ }
+
+ @Test
+ void testDeserializeConnectionConfigurationAwsOptionServerlessDefault() {
+ final String testArn = TEST_ROLE;
+ final Map configMetadata = generateConfigurationMetadataWithAwsOption(TEST_HOSTS, null, null, null, null, true, false, null, testArn, null, TEST_CERT_PATH, false, Collections.emptyMap());
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ configMetadata, ConnectionConfiguration.class);
+ assertTrue(connectionConfiguration.isValidAwsAuth());
+ assertTrue(connectionConfiguration.getAwsOption().isServerless());
+ assertTrue(connectionConfiguration.getAwsOption().isValidStsRoleArn());
+ }
+
+ @Test
+ void testDeserializeConnectionConfigurationWithDeprecatedBasicCredentialsAndNoCert() {
+ final Map pluginSetting = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ pluginSetting, ConnectionConfiguration.class);
+ assertEquals(TEST_HOSTS, connectionConfiguration.getHosts());
+ assertEquals(TEST_USERNAME, connectionConfiguration.getUsername());
+ assertEquals(TEST_PASSWORD, connectionConfiguration.getPassword());
+ assertEquals(TEST_CONNECT_TIMEOUT, connectionConfiguration.getConnectTimeout());
+ assertEquals(TEST_SOCKET_TIMEOUT, connectionConfiguration.getSocketTimeout());
+ assertFalse(connectionConfiguration.isAwsSigv4());
+ }
+
+ @Test
+ void testDeserializeConnectionConfigurationWithBasicCredentialsAndNoCert() {
+ final Map configurationMetadata = generateConfigurationMetadata(
+ TEST_HOSTS, null, null, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false);
+ configurationMetadata.put("authentication", Map.of("username", TEST_USERNAME, "password", TEST_PASSWORD));
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ configurationMetadata, ConnectionConfiguration.class);
+ assertEquals(TEST_HOSTS, connectionConfiguration.getHosts());
+ assertNull(connectionConfiguration.getUsername());
+ assertNull(connectionConfiguration.getPassword());
+ assertNotNull(connectionConfiguration.getAuthConfig());
+ assertEquals(TEST_USERNAME, connectionConfiguration.getAuthConfig().getUsername());
+ assertEquals(TEST_PASSWORD, connectionConfiguration.getAuthConfig().getPassword());
+ assertEquals(TEST_CONNECT_TIMEOUT, connectionConfiguration.getConnectTimeout());
+ assertEquals(TEST_SOCKET_TIMEOUT, connectionConfiguration.getSocketTimeout());
+ assertFalse(connectionConfiguration.isAwsSigv4());
+ }
+
+ @Test
+ void testDeserializeConnectionConfigurationWithBothDeprecatedBasicCredentialsAndAuthConfig() {
+ final Map configurationMetadata = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, null, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false);
+ configurationMetadata.put("authentication", Map.of("username", TEST_USERNAME, "password", TEST_PASSWORD));
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ configurationMetadata, ConnectionConfiguration.class);
+ assertFalse(connectionConfiguration.isValidAuthentication());
+ }
+
+ @Test
+ void testServerlessOptions() {
+ final String serverlessNetworkPolicyName = UUID.randomUUID().toString();
+ final String serverlessCollectionName = UUID.randomUUID().toString();
+ final String serverlessVpceId = UUID.randomUUID().toString();
+
+ final Map metadata = new HashMap<>();
+ final Map awsOptionMetadata = new HashMap<>();
+ final Map serverlessOptionsMetadata = new HashMap<>();
+ serverlessOptionsMetadata.put("network_policy_name", serverlessNetworkPolicyName);
+ serverlessOptionsMetadata.put("collection_name", serverlessCollectionName);
+ serverlessOptionsMetadata.put("vpce_id", serverlessVpceId);
+ awsOptionMetadata.put("region", UUID.randomUUID().toString());
+ awsOptionMetadata.put("serverless", true);
+ awsOptionMetadata.put("serverless_options", serverlessOptionsMetadata);
+ awsOptionMetadata.put("sts_role_arn", TEST_ROLE);
+ metadata.put("hosts", TEST_HOSTS);
+ metadata.put("username", UUID.randomUUID().toString());
+ metadata.put("password", UUID.randomUUID().toString());
+ metadata.put("connect_timeout", 1);
+ metadata.put("socket_timeout", 1);
+ metadata.put("aws", awsOptionMetadata);
+
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ metadata, ConnectionConfiguration.class);
+ assertThat(connectionConfiguration.getAwsOption().isServerless(), is(true));
+ assertThat(connectionConfiguration.getAwsOption().getServerlessOptions().getNetworkPolicyName(),
+ equalTo(serverlessNetworkPolicyName));
+ assertThat(connectionConfiguration.getAwsOption().getServerlessOptions().getCollectionName(),
+ equalTo(serverlessCollectionName));
+ assertThat(connectionConfiguration.getAwsOption().getServerlessOptions().getVpceId(),
+ equalTo(serverlessVpceId));
+ }
+
+ private Map generateConfigurationMetadata(
+ final List hosts, final String username, final String password,
+ final Integer connectTimeout, final Integer socketTimeout, final boolean awsSigv4, final String awsRegion,
+ final String awsStsRoleArn, final String certPath, final boolean insecure) {
+ final Map metadata = new HashMap<>();
+ metadata.put("hosts", hosts);
+ metadata.put("username", username);
+ metadata.put("password", password);
+ metadata.put("connect_timeout", connectTimeout);
+ metadata.put("socket_timeout", socketTimeout);
+ metadata.put("aws_sigv4", awsSigv4);
+ if (awsRegion != null) {
+ metadata.put("aws_region", awsRegion);
+ }
+ metadata.put("aws_sts_role_arn", awsStsRoleArn);
+ metadata.put("cert", certPath);
+ metadata.put("insecure", insecure);
+ return metadata;
+ }
+
+ private Map generateConfigurationMetadataWithAwsOption(
+ final List hosts, final String username, final String password,
+ final Integer connectTimeout, final Integer socketTimeout, final boolean serverless, final boolean awsSigv4, final String awsRegion,
+ final String awsStsRoleArn, final String awsStsExternalId, final String certPath, final boolean insecure, Map headerOverridesMap) {
+ final Map metadata = new HashMap<>();
+ final Map awsOptionMetadata = new HashMap<>();
+ metadata.put("hosts", hosts);
+ metadata.put("username", username);
+ metadata.put("password", password);
+ metadata.put("connect_timeout", connectTimeout);
+ metadata.put("socket_timeout", socketTimeout);
+ if (awsRegion != null) {
+ awsOptionMetadata.put("region", awsRegion);
+ }
+ awsOptionMetadata.put("serverless", serverless);
+ awsOptionMetadata.put("sts_role_arn", awsStsRoleArn);
+ awsOptionMetadata.put("sts_external_id", awsStsExternalId);
+ awsOptionMetadata.put("sts_header_overrides", headerOverridesMap);
+ metadata.put("aws", awsOptionMetadata);
+ metadata.put("cert", certPath);
+ metadata.put("insecure", insecure);
+ return metadata;
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorConfigTest.java b/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorConfigTest.java
new file mode 100644
index 0000000000..0c9cf23763
--- /dev/null
+++ b/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorConfigTest.java
@@ -0,0 +1,28 @@
+package org.opensearch.dataprepper.plugins.processor.oteltracegroup;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.opensearch.dataprepper.plugins.processor.oteltracegroup.ConnectionConfiguration.HOSTS;
+
+class OTelTraceGroupProcessorConfigTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final List TEST_HOSTS = Collections.singletonList("http://localhost:9200");
+
+ @Test
+ void testDeserialize() {
+ final Map pluginSetting = Map.of(HOSTS, TEST_HOSTS);
+ final OTelTraceGroupProcessorConfig objectUnderTest = OBJECT_MAPPER.convertValue(
+ pluginSetting, OTelTraceGroupProcessorConfig.class);
+ assertThat(objectUnderTest, notNullValue());
+ assertThat(objectUnderTest.getEsConnectionConfig(), notNullValue());
+ assertThat(objectUnderTest.getEsConnectionConfig().getHosts(), equalTo(TEST_HOSTS));
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorConfigTests.java b/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorConfigTests.java
deleted file mode 100644
index 3d89dad5d7..0000000000
--- a/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorConfigTests.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright OpenSearch Contributors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package org.opensearch.dataprepper.plugins.processor.oteltracegroup;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.MockedStatic;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.opensearch.dataprepper.model.configuration.PluginSetting;
-import org.opensearch.dataprepper.plugins.sink.opensearch.ConnectionConfiguration;
-
-import java.util.HashMap;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-
-@ExtendWith(MockitoExtension.class)
-class OTelTraceGroupProcessorConfigTests {
-
- @Mock
- private ConnectionConfiguration connectionConfigurationMock;
-
- @Test
- void testInitialize() {
- try (MockedStatic connectionConfigurationMockedStatic = Mockito.mockStatic(ConnectionConfiguration.class)) {
- connectionConfigurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(any(PluginSetting.class)))
- .thenReturn(connectionConfigurationMock);
- PluginSetting testPluginSetting = new PluginSetting("otel_trace_group", new HashMap<>());
- OTelTraceGroupProcessorConfig otelTraceGroupProcessorConfig = OTelTraceGroupProcessorConfig.buildConfig(testPluginSetting);
- assertEquals(connectionConfigurationMock, otelTraceGroupProcessorConfig.getEsConnectionConfig());
- }
- }
-}
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorTests.java b/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorTests.java
index 1eae1beceb..3881d772e6 100644
--- a/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorTests.java
+++ b/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OTelTraceGroupProcessorTests.java
@@ -25,13 +25,13 @@
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
+import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.trace.DefaultTraceGroupFields;
import org.opensearch.dataprepper.model.trace.JacksonSpan;
import org.opensearch.dataprepper.model.trace.Span;
import org.opensearch.dataprepper.plugins.processor.oteltracegroup.model.TraceGroup;
-import org.opensearch.dataprepper.plugins.sink.opensearch.ConnectionConfiguration;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
@@ -90,7 +90,7 @@ class OTelTraceGroupProcessorTests {
private static final String TEST_RAW_SPAN_MISSING_TRACE_GROUP_JSON_FILE_2 = "raw-span-missing-trace-group-2.json";
private static final int TEST_NUM_WORKERS = 2;
- private MockedStatic connectionConfigurationMockedStatic;
+ private MockedStatic openSearchClientFactoryMockedStatic;
private OTelTraceGroupProcessor otelTraceGroupProcessor;
private ExecutorService executorService;
@@ -98,6 +98,12 @@ class OTelTraceGroupProcessorTests {
@Mock
private ConnectionConfiguration connectionConfigurationMock;
+ @Mock
+ private OpenSearchClientFactory openSearchClientFactory;
+
+ @Mock
+ private OTelTraceGroupProcessorConfig otelTraceGroupProcessorConfig;
+
@Mock(lenient = true)
private RestHighLevelClient restHighLevelClient;
@@ -119,10 +125,12 @@ class OTelTraceGroupProcessorTests {
@BeforeEach
void setUp() throws Exception{
MetricsTestUtil.initMetrics();
- connectionConfigurationMockedStatic = Mockito.mockStatic(ConnectionConfiguration.class);
- connectionConfigurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(any(PluginSetting.class)))
- .thenReturn(connectionConfigurationMock);
- when(connectionConfigurationMock.createClient(awsCredentialsSupplier)).thenReturn(restHighLevelClient);
+ openSearchClientFactoryMockedStatic = Mockito.mockStatic(OpenSearchClientFactory.class);
+ openSearchClientFactoryMockedStatic.when(() -> OpenSearchClientFactory.fromConnectionConfiguration(
+ any(ConnectionConfiguration.class)))
+ .thenReturn(openSearchClientFactory);
+ when(otelTraceGroupProcessorConfig.getEsConnectionConfig()).thenReturn(connectionConfigurationMock);
+ when(openSearchClientFactory.createRestHighLevelClient(awsCredentialsSupplier)).thenReturn(restHighLevelClient);
when(restHighLevelClient.search(any(SearchRequest.class), any(RequestOptions.class))).thenReturn(testSearchResponse);
doNothing().when(restHighLevelClient).close();
when(testSearchResponse.getHits()).thenReturn(testSearchHits);
@@ -155,14 +163,16 @@ void setUp() throws Exception{
final PluginSetting testPluginSetting = mock(PluginSetting.class);
when(testPluginSetting.getName()).thenReturn(PLUGIN_NAME);
when(testPluginSetting.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
- otelTraceGroupProcessor = new OTelTraceGroupProcessor(testPluginSetting, awsCredentialsSupplier);
+ final PluginMetrics pluginMetrics = PluginMetrics.fromPluginSetting(testPluginSetting);
+ otelTraceGroupProcessor = new OTelTraceGroupProcessor(
+ pluginMetrics, otelTraceGroupProcessorConfig, awsCredentialsSupplier);
executorService = Executors.newFixedThreadPool(TEST_NUM_WORKERS);
}
@AfterEach
void tearDown() {
otelTraceGroupProcessor.shutdown();
- connectionConfigurationMockedStatic.close();
+ openSearchClientFactoryMockedStatic.close();
executorService.shutdown();
}
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OpenSearchClientFactoryTest.java b/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OpenSearchClientFactoryTest.java
new file mode 100644
index 0000000000..4e53dac060
--- /dev/null
+++ b/data-prepper-plugins/otel-trace-group-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/oteltracegroup/OpenSearchClientFactoryTest.java
@@ -0,0 +1,346 @@
+package org.opensearch.dataprepper.plugins.processor.oteltracegroup;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
+import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.opensearch.dataprepper.plugins.processor.oteltracegroup.ConnectionConfiguration.PROXY;
+
+@ExtendWith(MockitoExtension.class)
+class OpenSearchClientFactoryTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final List TEST_HOSTS = Collections.singletonList("http://localhost:9200");
+ private final String TEST_USERNAME = "admin";
+ private final String TEST_PASSWORD = "admin";
+ private final Integer TEST_CONNECT_TIMEOUT = 5;
+ private final Integer TEST_SOCKET_TIMEOUT = 10;
+ private final String TEST_CERT_PATH = Objects.requireNonNull(getClass().getClassLoader().getResource("test-ca.pem")).getFile();
+ private final String TEST_ROLE = "arn:aws:iam::123456789012:role/test-role";
+
+ @Mock
+ private AwsCredentialsSupplier awsCredentialsSupplier;
+
+ private OpenSearchClientFactory objectUnderTest;
+
+ @Test
+ void testcreateRestHighLevelClientDefault() throws IOException {
+ final Map pluginSetting = generateConfigurationMetadata(
+ TEST_HOSTS, null, null, null, null, false, null, null, null, false);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ pluginSetting, ConnectionConfiguration.class);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ final RestHighLevelClient client = objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier);
+ assertNotNull(client);
+ client.close();
+ }
+
+ @Test
+ void testcreateRestHighLevelClientWithDeprecatedBasicCredentialsAndNoCert() throws IOException {
+ final Map pluginSetting = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ pluginSetting, ConnectionConfiguration.class);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ final RestHighLevelClient client = objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier);
+ assertNotNull(client);
+ client.close();
+ }
+
+ @Test
+ void testcreateRestHighLevelClientWithBasicCredentialsAndNoCert() throws IOException {
+ final Map configurationMetadata = generateConfigurationMetadata(
+ TEST_HOSTS, null, null, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false);
+ configurationMetadata.put("authentication", Map.of("username", TEST_USERNAME, "password", TEST_PASSWORD));
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ configurationMetadata, ConnectionConfiguration.class);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ final RestHighLevelClient client = objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier);
+ assertNotNull(client);
+ client.close();
+ }
+
+ @Test
+ void testcreateRestHighLevelClientInsecure() throws IOException {
+ final Map pluginSetting = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, true);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ pluginSetting, ConnectionConfiguration.class);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ final RestHighLevelClient client = objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier);
+ assertNotNull(client);
+ client.close();
+ }
+
+ @Test
+ void testcreateRestHighLevelClientWithCertPath() throws IOException {
+ final Map pluginSetting = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ pluginSetting, ConnectionConfiguration.class);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ final RestHighLevelClient client = objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier);
+ assertNotNull(client);
+ client.close();
+ }
+
+ @Test
+ void testcreateRestHighLevelClientWithAWSSigV4AndSTSRole() {
+ final Map pluginSetting = generateConfigurationMetadata(
+ TEST_HOSTS, null, null, null, null, true, null, TEST_ROLE, TEST_CERT_PATH, false);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ pluginSetting, ConnectionConfiguration.class);
+ assertThat(connectionConfiguration, notNullValue());
+ assertThat(connectionConfiguration.getAwsRegion(), equalTo("us-east-1"));
+ assertThat(connectionConfiguration.isAwsSigv4(), equalTo(true));
+ assertThat(connectionConfiguration.getAwsStsRoleArn(), equalTo(TEST_ROLE));
+
+ final AwsCredentialsProvider awsCredentialsProvider = mock(AwsCredentialsProvider.class);
+ when(awsCredentialsSupplier.getProvider(any())).thenReturn(awsCredentialsProvider);
+
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier);
+
+ final ArgumentCaptor awsCredentialsOptionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class);
+ verify(awsCredentialsSupplier).getProvider(awsCredentialsOptionsArgumentCaptor.capture());
+ final AwsCredentialsOptions actualOptions = awsCredentialsOptionsArgumentCaptor.getValue();
+
+ assertThat(actualOptions.getStsRoleArn(), equalTo(TEST_ROLE));
+ assertThat(actualOptions.getStsHeaderOverrides(), notNullValue());
+ assertThat(actualOptions.getStsHeaderOverrides().size(), equalTo(0));
+ }
+
+ @Test
+ void testcreateRestHighLevelClientWithAWSOption() {
+ final String headerName1 = UUID.randomUUID().toString();
+ final String headerValue1 = UUID.randomUUID().toString();
+ final String headerName2 = UUID.randomUUID().toString();
+ final String headerValue2 = UUID.randomUUID().toString();
+ final String testArn = TEST_ROLE;
+ final String externalId = UUID.randomUUID().toString();
+ final Map configurationMetadata = generateConfigurationMetadataWithAwsOption(TEST_HOSTS, null, null, null, null, false, true, null, testArn, externalId, null,false, Map.of(headerName1, headerValue1, headerName2, headerValue2));
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ configurationMetadata, ConnectionConfiguration.class);
+
+ assertThat(connectionConfiguration, notNullValue());
+ assertThat(connectionConfiguration.isAwsSigv4(), equalTo(true));
+
+ final AwsCredentialsProvider awsCredentialsProvider = mock(AwsCredentialsProvider.class);
+ when(awsCredentialsSupplier.getProvider(any())).thenReturn(awsCredentialsProvider);
+
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier);
+
+ final ArgumentCaptor awsCredentialsOptionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class);
+ verify(awsCredentialsSupplier).getProvider(awsCredentialsOptionsArgumentCaptor.capture());
+ final AwsCredentialsOptions actualOptions = awsCredentialsOptionsArgumentCaptor.getValue();
+
+ assertThat(actualOptions.getStsRoleArn(), equalTo(TEST_ROLE));
+ assertThat(actualOptions.getStsExternalId(), equalTo(externalId));
+ assertThat(actualOptions.getStsHeaderOverrides(), notNullValue());
+ assertThat(actualOptions.getStsHeaderOverrides().size(), equalTo(2));
+ assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName1));
+ assertThat(actualOptions.getStsHeaderOverrides().get(headerName1), equalTo(headerValue1));
+ assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName2));
+ assertThat(actualOptions.getStsHeaderOverrides().get(headerName2), equalTo(headerValue2));
+ }
+
+ @Test
+ void testcreateRestHighLevelClientWithAWSSigV4AndHeaderOverrides() {
+ final String headerName1 = UUID.randomUUID().toString();
+ final String headerValue1 = UUID.randomUUID().toString();
+ final String headerName2 = UUID.randomUUID().toString();
+ final String headerValue2 = UUID.randomUUID().toString();
+ final Map configurationMetadata = generateConfigurationMetadata(TEST_HOSTS, null, null, null, null, true, null, TEST_ROLE, TEST_CERT_PATH, false);
+ configurationMetadata.put("aws_sts_header_overrides", Map.of(headerName1, headerValue1, headerName2, headerValue2));
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ configurationMetadata, ConnectionConfiguration.class);
+
+ assertThat(connectionConfiguration, notNullValue());
+ assertThat(connectionConfiguration.isAwsSigv4(), equalTo(true));
+
+ final AwsCredentialsProvider awsCredentialsProvider = mock(AwsCredentialsProvider.class);
+ when(awsCredentialsSupplier.getProvider(any())).thenReturn(awsCredentialsProvider);
+
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier);
+
+ final ArgumentCaptor awsCredentialsOptionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class);
+ verify(awsCredentialsSupplier).getProvider(awsCredentialsOptionsArgumentCaptor.capture());
+ final AwsCredentialsOptions actualOptions = awsCredentialsOptionsArgumentCaptor.getValue();
+
+ assertThat(actualOptions.getStsRoleArn(), equalTo(TEST_ROLE));
+ assertThat(actualOptions.getRegion(), equalTo(Region.US_EAST_1));
+ assertThat(actualOptions.getStsHeaderOverrides(), notNullValue());
+ assertThat(actualOptions.getStsHeaderOverrides().size(), equalTo(2));
+ assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName1));
+ assertThat(actualOptions.getStsHeaderOverrides().get(headerName1), equalTo(headerValue1));
+ assertThat(actualOptions.getStsHeaderOverrides(), hasKey(headerName2));
+ assertThat(actualOptions.getStsHeaderOverrides().get(headerName2), equalTo(headerValue2));
+ }
+
+ @Test
+ void testcreateRestHighLevelClient_WithValidHttpProxy_HostIP() throws IOException {
+ final Map metadata = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false);
+ final String testHttpProxy = "121.121.121.121:80";
+ metadata.put(PROXY, testHttpProxy);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ metadata, ConnectionConfiguration.class);
+ assertEquals(connectionConfiguration.getProxy(), testHttpProxy);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ final RestHighLevelClient client = objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier);
+ assertNotNull(client);
+ client.close();
+ }
+
+ @Test
+ void testcreateRestHighLevelClient_WithValidHttpProxy_HostName() throws IOException {
+ final Map metadata = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false);
+ final String testHttpProxy = "example.com:80";
+ metadata.put(PROXY, testHttpProxy);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ metadata, ConnectionConfiguration.class);
+ assertEquals(connectionConfiguration.getProxy(), testHttpProxy);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ final RestHighLevelClient client = objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier);
+ assertNotNull(client);
+ client.close();
+ }
+
+ @Test
+ void testcreateRestHighLevelClient_WithValidHttpProxy_SchemeProvided() throws IOException {
+ final Map metadata = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false);
+ final String testHttpProxy = "http://example.com:4350";
+ metadata.put(PROXY, testHttpProxy);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ metadata, ConnectionConfiguration.class);
+ assertEquals(connectionConfiguration.getProxy(), testHttpProxy);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ final RestHighLevelClient client = objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier);
+ assertNotNull(client);
+ client.close();
+ }
+
+ @Test
+ void testcreateRestHighLevelClient_WithInvalidHttpProxy_InvalidPort() {
+ final Map metadata = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false);
+ final String testHttpProxy = "example.com:port";
+ metadata.put(PROXY, testHttpProxy);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ metadata, ConnectionConfiguration.class);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ assertEquals(connectionConfiguration.getProxy(), testHttpProxy);
+ assertThrows(IllegalArgumentException.class, () -> objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier));
+ }
+
+ @Test
+ void testcreateRestHighLevelClient_WithInvalidHttpProxy_NoPort() {
+ final Map metadata = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false);
+ final String testHttpProxy = "example.com";
+ metadata.put(PROXY, testHttpProxy);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ metadata, ConnectionConfiguration.class);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ assertThrows(IllegalArgumentException.class, () -> objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier));
+ }
+
+ @Test
+ void testcreateRestHighLevelClient_WithInvalidHttpProxy_PortNotInRange() {
+ final Map metadata = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false);
+ final String testHttpProxy = "example.com:888888";
+ metadata.put(PROXY, testHttpProxy);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ metadata, ConnectionConfiguration.class);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ assertThrows(IllegalArgumentException.class, () -> objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier));
+ }
+
+ @Test
+ void testcreateRestHighLevelClient_WithInvalidHttpProxy_NotHttp() {
+ final Map metadata = generateConfigurationMetadata(
+ TEST_HOSTS, TEST_USERNAME, TEST_PASSWORD, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, TEST_CERT_PATH, false);
+ final String testHttpProxy = "socket://example.com:port";
+ metadata.put(PROXY, testHttpProxy);
+ final ConnectionConfiguration connectionConfiguration = OBJECT_MAPPER.convertValue(
+ metadata, ConnectionConfiguration.class);
+ assertEquals(connectionConfiguration.getProxy(), testHttpProxy);
+ objectUnderTest = OpenSearchClientFactory.fromConnectionConfiguration(connectionConfiguration);
+ assertThrows(IllegalArgumentException.class, () -> objectUnderTest.createRestHighLevelClient(awsCredentialsSupplier));
+ }
+
+ private Map generateConfigurationMetadata(
+ final List hosts, final String username, final String password,
+ final Integer connectTimeout, final Integer socketTimeout, final boolean awsSigv4, final String awsRegion,
+ final String awsStsRoleArn, final String certPath, final boolean insecure) {
+ final Map metadata = new HashMap<>();
+ metadata.put("hosts", hosts);
+ metadata.put("username", username);
+ metadata.put("password", password);
+ metadata.put("connect_timeout", connectTimeout);
+ metadata.put("socket_timeout", socketTimeout);
+ metadata.put("aws_sigv4", awsSigv4);
+ if (awsRegion != null) {
+ metadata.put("aws_region", awsRegion);
+ }
+ metadata.put("aws_sts_role_arn", awsStsRoleArn);
+ metadata.put("cert", certPath);
+ metadata.put("insecure", insecure);
+ return metadata;
+ }
+
+ private Map generateConfigurationMetadataWithAwsOption(
+ final List hosts, final String username, final String password,
+ final Integer connectTimeout, final Integer socketTimeout, final boolean serverless, final boolean awsSigv4, final String awsRegion,
+ final String awsStsRoleArn, final String awsStsExternalId, final String certPath, final boolean insecure, Map headerOverridesMap) {
+ final Map metadata = new HashMap<>();
+ final Map awsOptionMetadata = new HashMap<>();
+ metadata.put("hosts", hosts);
+ metadata.put("username", username);
+ metadata.put("password", password);
+ metadata.put("connect_timeout", connectTimeout);
+ metadata.put("socket_timeout", socketTimeout);
+ if (awsRegion != null) {
+ awsOptionMetadata.put("region", awsRegion);
+ }
+ awsOptionMetadata.put("serverless", serverless);
+ awsOptionMetadata.put("sts_role_arn", awsStsRoleArn);
+ awsOptionMetadata.put("sts_external_id", awsStsExternalId);
+ awsOptionMetadata.put("sts_header_overrides", headerOverridesMap);
+ metadata.put("aws", awsOptionMetadata);
+ metadata.put("cert", certPath);
+ metadata.put("insecure", insecure);
+ return metadata;
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/otel-trace-group-processor/src/test/resources/test-ca.pem b/data-prepper-plugins/otel-trace-group-processor/src/test/resources/test-ca.pem
new file mode 100644
index 0000000000..4015d866e1
--- /dev/null
+++ b/data-prepper-plugins/otel-trace-group-processor/src/test/resources/test-ca.pem
@@ -0,0 +1,24 @@
+-----BEGIN CERTIFICATE-----
+MIID/jCCAuagAwIBAgIBATANBgkqhkiG9w0BAQsFADCBjzETMBEGCgmSJomT8ixk
+ARkWA2NvbTEXMBUGCgmSJomT8ixkARkWB2V4YW1wbGUxGTAXBgNVBAoMEEV4YW1w
+bGUgQ29tIEluYy4xITAfBgNVBAsMGEV4YW1wbGUgQ29tIEluYy4gUm9vdCBDQTEh
+MB8GA1UEAwwYRXhhbXBsZSBDb20gSW5jLiBSb290IENBMB4XDTE4MDQyMjAzNDM0
+NloXDTI4MDQxOTAzNDM0NlowgY8xEzARBgoJkiaJk/IsZAEZFgNjb20xFzAVBgoJ
+kiaJk/IsZAEZFgdleGFtcGxlMRkwFwYDVQQKDBBFeGFtcGxlIENvbSBJbmMuMSEw
+HwYDVQQLDBhFeGFtcGxlIENvbSBJbmMuIFJvb3QgQ0ExITAfBgNVBAMMGEV4YW1w
+bGUgQ29tIEluYy4gUm9vdCBDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
+ggEBAK/u+GARP5innhpXK0c0q7s1Su1VTEaIgmZr8VWI6S8amf5cU3ktV7WT9SuV
+TsAm2i2A5P+Ctw7iZkfnHWlsC3HhPUcd6mvzGZ4moxnamM7r+a9otRp3owYoGStX
+ylVTQusAjbq9do8CMV4hcBTepCd+0w0v4h6UlXU8xjhj1xeUIz4DKbRgf36q0rv4
+VIX46X72rMJSETKOSxuwLkov1ZOVbfSlPaygXIxqsHVlj1iMkYRbQmaTib6XWHKf
+MibDaqDejOhukkCjzpptGZOPFQ8002UtTTNv1TiaKxkjMQJNwz6jfZ53ws3fh1I0
+RWT6WfM4oeFRFnyFRmc4uYTUgAkCAwEAAaNjMGEwDwYDVR0TAQH/BAUwAwEB/zAf
+BgNVHSMEGDAWgBSSNQzgDx4rRfZNOfN7X6LmEpdAczAdBgNVHQ4EFgQUkjUM4A8e
+K0X2TTnze1+i5hKXQHMwDgYDVR0PAQH/BAQDAgGGMA0GCSqGSIb3DQEBCwUAA4IB
+AQBoQHvwsR34hGO2m8qVR9nQ5Klo5HYPyd6ySKNcT36OZ4AQfaCGsk+SecTi35QF
+RHL3g2qffED4tKR0RBNGQSgiLavmHGCh3YpDupKq2xhhEeS9oBmQzxanFwWFod4T
+nnsG2cCejyR9WXoRzHisw0KJWeuNlwjUdJY0xnn16srm1zL/M/f0PvCyh9HU1mF1
+ivnOSqbDD2Z7JSGyckgKad1Omsg/rr5XYtCeyJeXUPcmpeX6erWJJNTUh6yWC/hY
+G/dFC4xrJhfXwz6Z0ytUygJO32bJG4Np2iGAwvvgI9EfxzEv/KP+FGrJOvQJAq4/
+BU36ZAa80W/8TBnqZTkNnqZV
+-----END CERTIFICATE-----