Skip to content

Commit

Permalink
making the event type default
Browse files Browse the repository at this point in the history
Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 committed Nov 17, 2024
1 parent 691db66 commit ff27e16
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,47 @@
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.model.configuration.PluginModel;

import lombok.Getter;
import java.time.Duration;
import lombok.Getter;
import org.opensearch.dataprepper.model.configuration.PluginModel;

@Getter
public class LambdaCommonConfig {
public static final int DEFAULT_CONNECTION_RETRIES = 3;
public static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(60);
public static final String STS_REGION = "region";
public static final String STS_ROLE_ARN = "sts_role_arn";

@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationOptions awsAuthenticationOptions;

@JsonPropertyDescription("Lambda Function Name")
@JsonProperty("function_name")
@NotEmpty
@Size(min = 3, max = 500, message = "function name length should be at least 3 characters")
private String functionName;

@JsonPropertyDescription("Total retries we want before failing")
@JsonProperty("max_retries")
private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES;

@JsonPropertyDescription("invocation type defines the way we want to call lambda function")
@JsonProperty("invocation_type")
private InvocationType invocationType = InvocationType.REQUEST_RESPONSE;

@JsonPropertyDescription("sdk timeout defines the time sdk maintains the connection to the client before timing out")
@JsonProperty("connection_timeout")
private Duration connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;

@JsonProperty("batch")
private BatchOptions batchOptions;

@JsonPropertyDescription("Codec configuration for parsing Lambda responses")
@JsonProperty("response_codec")
@Valid
private PluginModel responseCodecConfig;
public abstract class LambdaCommonConfig {

public static final int DEFAULT_CONNECTION_RETRIES = 3;
public static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(60);
public static final String STS_REGION = "region";
public static final String STS_ROLE_ARN = "sts_role_arn";

@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationOptions awsAuthenticationOptions;

@JsonPropertyDescription("Lambda Function Name")
@JsonProperty("function_name")
@NotEmpty
@Size(min = 3, max = 500, message = "function name length should be at least 3 characters")
private String functionName;

@JsonPropertyDescription("Total retries we want before failing")
@JsonProperty("max_retries")
private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES;


@JsonPropertyDescription("sdk timeout defines the time sdk maintains the connection to the client before timing out")
@JsonProperty("connection_timeout")
private Duration connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;

@JsonProperty("batch")
private BatchOptions batchOptions;

@JsonPropertyDescription("Codec configuration for parsing Lambda responses")
@JsonProperty("response_codec")
@Valid
private PluginModel responseCodecConfig;


public abstract InvocationType getInvocationType();

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,47 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig;

import java.util.Collections;
import java.util.List;
import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType;
import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig;

public class LambdaProcessorConfig extends LambdaCommonConfig {

@JsonPropertyDescription("Defines the way Data Prepper treats the response from Lambda")
@JsonProperty("response_events_match")
private boolean responseEventsMatch = false;

@JsonPropertyDescription("defines a condition for event to use this processor")
@JsonProperty("lambda_when")
private String whenCondition;

@JsonProperty("tags_on_failure")
@JsonPropertyDescription("A <code>List</code> of <code>String</code>s that specifies the tags to be set in the event when lambda fails to " +
"or exception occurs. This tag may be used in conditional expressions in " +
"other parts of the configuration")
private List<String> tagsOnFailure = Collections.emptyList();

public List<String> getTagsOnFailure(){
return tagsOnFailure;
}

public String getWhenCondition() {
return whenCondition;
}

public Boolean getResponseEventsMatch() {
return responseEventsMatch;
}

@JsonPropertyDescription("invocation type defines the way we want to call lambda function")
@JsonProperty("invocation_type")
private InvocationType invocationType = InvocationType.REQUEST_RESPONSE;

@JsonPropertyDescription("Defines the way Data Prepper treats the response from Lambda")
@JsonProperty("response_events_match")
private boolean responseEventsMatch = false;

@JsonPropertyDescription("defines a condition for event to use this processor")
@JsonProperty("lambda_when")
private String whenCondition;

@JsonProperty("tags_on_failure")
@JsonPropertyDescription(
"A <code>List</code> of <code>String</code>s that specifies the tags to be set in the event when lambda fails to "
+
"or exception occurs. This tag may be used in conditional expressions in " +
"other parts of the configuration")
private List<String> tagsOnFailure = Collections.emptyList();

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

public String getWhenCondition() {
return whenCondition;
}

public Boolean getResponseEventsMatch() {
return responseEventsMatch;
}

@Override
public InvocationType getInvocationType() {
return invocationType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,44 @@
package org.opensearch.dataprepper.plugins.lambda.sink;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig;

import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import java.util.Map;
import java.util.Objects;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType;
import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig;

public class LambdaSinkConfig extends LambdaCommonConfig {

@JsonProperty("dlq")
private PluginModel dlq;

public PluginModel getDlq() {
return dlq;
}

public String getDlqStsRoleARN(){
return Objects.nonNull(getDlqPluginSetting().get(STS_ROLE_ARN)) ?
String.valueOf(getDlqPluginSetting().get(STS_ROLE_ARN)) :
getAwsAuthenticationOptions().getAwsStsRoleArn();
}

public String getDlqStsRegion(){
return Objects.nonNull(getDlqPluginSetting().get(STS_REGION)) ?
String.valueOf(getDlqPluginSetting().get(STS_REGION)) :
getAwsAuthenticationOptions().getAwsRegion().toString();
}

public Map<String, Object> getDlqPluginSetting(){
return dlq != null ? dlq.getPluginSettings() : null;
}

@JsonPropertyDescription("invocation type defines the way we want to call lambda function")
@JsonProperty("invocation_type")
private InvocationType invocationType = InvocationType.EVENT;

@JsonProperty("dlq")
private PluginModel dlq;

public PluginModel getDlq() {
return dlq;
}

public String getDlqStsRoleARN() {
return Objects.nonNull(getDlqPluginSetting().get(STS_ROLE_ARN)) ?
String.valueOf(getDlqPluginSetting().get(STS_ROLE_ARN)) :
getAwsAuthenticationOptions().getAwsStsRoleArn();
}

public String getDlqStsRegion() {
return Objects.nonNull(getDlqPluginSetting().get(STS_REGION)) ?
String.valueOf(getDlqPluginSetting().get(STS_REGION)) :
getAwsAuthenticationOptions().getAwsRegion().toString();
}

public Map<String, Object> getDlqPluginSetting() {
return dlq != null ? dlq.getPluginSettings() : null;
}

@Override
public InvocationType getInvocationType() {
return invocationType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,52 @@

package org.opensearch.dataprepper.plugins.lambda.common.config;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessorConfig;
import software.amazon.awssdk.regions.Region;

public class LambdaCommonConfigTest {
private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));

@Test
void test_defaults() {
final LambdaCommonConfig lambdaCommonConfig = new LambdaCommonConfig();
assertThat(lambdaCommonConfig.getFunctionName(), equalTo(null));
assertThat(lambdaCommonConfig.getAwsAuthenticationOptions(), equalTo(null));
assertThat(lambdaCommonConfig.getBatchOptions(), equalTo(null));
assertThat(lambdaCommonConfig.getResponseCodecConfig(), equalTo(null));
assertThat(lambdaCommonConfig.getConnectionTimeout(), equalTo(LambdaCommonConfig.DEFAULT_CONNECTION_TIMEOUT));
assertThat(lambdaCommonConfig.getMaxConnectionRetries(), equalTo(LambdaCommonConfig.DEFAULT_CONNECTION_RETRIES));
assertThat(lambdaCommonConfig.getInvocationType(), equalTo(InvocationType.REQUEST_RESPONSE));
}

@Test
public void testAwsAuthenticationOptionsNotNull() throws JsonProcessingException {
final String config = " function_name: test_function\n" + " aws:\n" + " region: ap-south-1\n" + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + " sts_header_overrides: {\"test\":\"test\"}\n" + " max_retries: 10\n";
final LambdaCommonConfig lambdaCommonConfig = objectMapper.readValue(config, LambdaCommonConfig.class);

assertThat(lambdaCommonConfig.getMaxConnectionRetries(), equalTo(10));
assertThat(lambdaCommonConfig.getAwsAuthenticationOptions().getAwsRegion(), equalTo(Region.AP_SOUTH_1));
assertThat(lambdaCommonConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(), equalTo("arn:aws:iam::524239988912:role/app-test"));
assertThat(lambdaCommonConfig.getAwsAuthenticationOptions().getAwsStsHeaderOverrides().get("test"), equalTo("test"));
}

private final ObjectMapper objectMapper = new ObjectMapper(
new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));

@Test
void test_defaults() {
final LambdaCommonConfig lambdaCommonConfig = new LambdaProcessorConfig();
assertThat(lambdaCommonConfig.getFunctionName(), equalTo(null));
assertThat(lambdaCommonConfig.getAwsAuthenticationOptions(), equalTo(null));
assertThat(lambdaCommonConfig.getBatchOptions(), equalTo(null));
assertThat(lambdaCommonConfig.getResponseCodecConfig(), equalTo(null));
assertThat(lambdaCommonConfig.getConnectionTimeout(),
equalTo(LambdaCommonConfig.DEFAULT_CONNECTION_TIMEOUT));
assertThat(lambdaCommonConfig.getMaxConnectionRetries(),
equalTo(LambdaCommonConfig.DEFAULT_CONNECTION_RETRIES));
assertThat(lambdaCommonConfig.getInvocationType(), equalTo(InvocationType.REQUEST_RESPONSE));
}

@Test
public void testAwsAuthenticationOptionsNotNull() throws JsonProcessingException {
final String config = " function_name: test_function\n" + " aws:\n"
+ " region: ap-south-1\n"
+ " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n"
+ " sts_header_overrides: {\"test\":\"test\"}\n" + " max_retries: 10\n";
final LambdaProcessorConfig lambdaCommonConfig = objectMapper.readValue(config,
LambdaProcessorConfig.class);

assertThat(lambdaCommonConfig.getMaxConnectionRetries(), equalTo(10));
assertThat(lambdaCommonConfig.getAwsAuthenticationOptions().getAwsRegion(),
equalTo(Region.AP_SOUTH_1));
assertThat(lambdaCommonConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(),
equalTo("arn:aws:iam::524239988912:role/app-test"));
assertThat(
lambdaCommonConfig.getAwsAuthenticationOptions().getAwsStsHeaderOverrides().get("test"),
equalTo("test"));
}
}

0 comments on commit ff27e16

Please sign in to comment.