Skip to content

Commit

Permalink
Address Scale Items for lambda plugin (#5032)
Browse files Browse the repository at this point in the history
* Add support for lambda async client in lambda processor

Signed-off-by: Srikanth Govindarajan <[email protected]>

Refactor aws lambda plugin to have a class for common methods between processor and sink

Signed-off-by: Srikanth Govindarajan <[email protected]>

Add support for lambda async client in lambda sink

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Changes to Lambda Plugin Integration Test

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Add JsonPropertyDescription to all Config, add debug logs and add ITs

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Address Acknowledgements for processor and sink; Add request and response codec

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Address comments

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Add response processing mode to processor configuration

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Add Response Handling Strategy; Make InvocationType and ResponseCardinality enums; Change reponse_processing_mode option to response_cardinality

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Address Enum

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Address Enum2

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Fix checkstyle

Signed-off-by: Srikanth Govindarajan <[email protected]>

---------

Signed-off-by: Srikanth Govindarajan <[email protected]>
  • Loading branch information
srikanthjg authored Oct 29, 2024
1 parent cfaf19d commit e311f0d
Show file tree
Hide file tree
Showing 35 changed files with 1,947 additions and 1,076 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType;
import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -39,10 +40,9 @@
import java.util.List;

@ExtendWith(MockitoExtension.class)

public class LambdaProcessorServiceIT {

private LambdaClient lambdaClient;
private LambdaAsyncClient lambdaAsyncClient;
private String functionName;
private String lambdaRegion;
private String role;
Expand Down Expand Up @@ -82,7 +82,7 @@ public void setUp() throws Exception {

final Region region = Region.of(lambdaRegion);

lambdaClient = LambdaClient.builder()
lambdaAsyncClient = LambdaAsyncClient.builder()
.region(Region.of(lambdaRegion))
.build();

Expand All @@ -103,11 +103,11 @@ private static Record<Event> createRecord() {
public LambdaProcessor createObjectUnderTest(final String config) throws JsonProcessingException {

final LambdaProcessorConfig lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class);
return new LambdaProcessor(pluginMetrics,lambdaProcessorConfig,awsCredentialsSupplier,expressionEvaluator);
return new LambdaProcessor(pluginFactory,pluginMetrics,lambdaProcessorConfig,awsCredentialsSupplier,expressionEvaluator);
}

public LambdaProcessor createObjectUnderTest(LambdaProcessorConfig lambdaSinkConfig) throws JsonProcessingException {
return new LambdaProcessor(pluginMetrics,lambdaSinkConfig,awsCredentialsSupplier,expressionEvaluator);
return new LambdaProcessor(pluginFactory,pluginMetrics,lambdaSinkConfig,awsCredentialsSupplier,expressionEvaluator);
}


Expand All @@ -131,7 +131,7 @@ void verify_records_to_lambda_success(final int recordCount) throws Exception {

when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName);
when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3);
when(lambdaProcessorConfig.getInvocationType()).thenReturn("RequestResponse");
when(lambdaProcessorConfig.getInvocationType()).thenReturn(InvocationType.REQUEST_RESPONSE);

LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig);

Expand All @@ -148,7 +148,7 @@ void verify_records_with_batching_to_lambda(final int recordCount) throws JsonPr

when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName);
when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3);
when(lambdaProcessorConfig.getInvocationType()).thenReturn("RequestResponse");
when(lambdaProcessorConfig.getInvocationType()).thenReturn(InvocationType.REQUEST_RESPONSE);
when(thresholdOptions.getEventCount()).thenReturn(1);
when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb"));
when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.RESPONSE_PAYLOAD_SIZE;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -57,7 +57,7 @@
@ExtendWith(MockitoExtension.class)
class LambdaSinkServiceIT {

private LambdaClient lambdaClient;
private LambdaAsyncClient lambdaAsyncClient;
private String functionName;
private String lambdaRegion;
private String role;
Expand Down Expand Up @@ -104,7 +104,7 @@ public void setUp() throws Exception {

final Region region = Region.of(lambdaRegion);

lambdaClient = LambdaClient.builder()
lambdaAsyncClient = LambdaAsyncClient.builder()
.region(Region.of(lambdaRegion))
.build();

Expand All @@ -130,7 +130,7 @@ public LambdaSinkService createObjectUnderTest(final String config) throws JsonP
final LambdaSinkConfig lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class);
OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
pluginFactory = null;
return new LambdaSinkService(lambdaClient,
return new LambdaSinkService(lambdaAsyncClient,
lambdaSinkConfig,
pluginMetrics,
pluginFactory,
Expand All @@ -146,7 +146,7 @@ public LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig

OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
pluginFactory = null;
return new LambdaSinkService(lambdaClient,
return new LambdaSinkService(lambdaAsyncClient,
lambdaSinkConfig,
pluginMetrics,
pluginFactory,
Expand Down Expand Up @@ -196,7 +196,7 @@ void verify_flushed_records_to_lambda_success(final int recordCount) throws Exce
@ValueSource(ints = {1,5,10})
void verify_flushed_records_to_lambda_failed_and_dlq_works(final int recordCount) throws Exception {
final String LAMBDA_SINK_CONFIG_INVALID_FUNCTION_NAME =
" function_name: $$$\n" +
" function_name: $$$\n" +
" aws:\n" +
" region: us-east-1\n" +
" sts_role_arn: arn:aws:iam::176893235612:role/osis-s3-opensearch-role\n" +
Expand All @@ -211,7 +211,7 @@ void verify_flushed_records_to_lambda_failed_and_dlq_works(final int recordCount
objectUnderTest.output(recordsData);
Thread.sleep(Duration.ofSeconds(10).toMillis());

verify( numberOfRecordsFailedCounter, times(recordCount)).increment(1);
verify( numberOfRecordsFailedCounter, times(recordCount)).increment(1);
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.opensearch.dataprepper.plugins.lambda.common;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory;
import org.slf4j.Logger;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class LambdaCommonHandler {
private final Logger LOG;
private final LambdaAsyncClient lambdaAsyncClient;
private final String functionName;
private final String invocationType;
BufferFactory bufferFactory;
private final ObjectMapper objectMapper = new ObjectMapper();

public LambdaCommonHandler(
final Logger log,
final LambdaAsyncClient lambdaAsyncClient,
final String functionName,
final String invocationType,
BufferFactory bufferFactory){
this.LOG = log;
this.lambdaAsyncClient = lambdaAsyncClient;
this.functionName = functionName;
this.invocationType = invocationType;
this.bufferFactory = bufferFactory;
}

public Buffer createBuffer(Buffer currentBuffer) {
try {
LOG.debug("Resetting buffer");
currentBuffer = bufferFactory.getBuffer(lambdaAsyncClient, functionName, invocationType);
return currentBuffer;
} catch (IOException e) {
throw new RuntimeException("Failed to reset buffer", e);
}
}

public boolean checkStatusCode(InvokeResponse response) {
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
LOG.error("Lambda invocation returned with non-success status code: {}", statusCode);
return false;
}
return true;
}

public void waitForFutures(List<CompletableFuture<Void>> futureList) {
if (!futureList.isEmpty()) {
try {
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
LOG.info("All {} Lambda invocations have completed", futureList.size());
} catch (Exception e) {
LOG.warn("Exception while waiting for Lambda invocations to complete", e);
} finally {
futureList.clear();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@

package org.opensearch.dataprepper.plugins.lambda.common.accumlator;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;

import java.io.OutputStream;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* A buffer can hold data before flushing it.
Expand All @@ -22,27 +26,27 @@ public interface Buffer {

Duration getDuration();

InvokeResponse flushToLambdaAsync();

InvokeResponse flushToLambdaSync();
CompletableFuture<InvokeResponse> flushToLambda(String invocationType);

OutputStream getOutputStream();

SdkBytes getPayload();

void setEventCount(int eventCount);

//Metrics
public Duration getFlushLambdaSyncLatencyMetric();
public void addRecord(Record<Event> record);

public List<Record<Event>> getRecords();

public Duration getFlushLambdaLatencyMetric();

public Long getPayloadRequestSyncSize();
public Long getPayloadRequestSize();

public Duration getFlushLambdaAsyncLatencyMetric();
public Long getPayloadResponseSize();

public Long getPayloadResponseSyncSize();
public Duration stopLatencyWatch();

public Long getPayloadRequestAsyncSize();

public Long getPayloadResponseAsyncSize();
void reset();

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

package org.opensearch.dataprepper.plugins.lambda.common.accumlator;

import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;

import java.io.IOException;

public interface BufferFactory {
Buffer getBuffer(LambdaClient lambdaClient, String functionName, String invocationType) throws IOException;
Buffer getBuffer(LambdaAsyncClient lambdaAsyncClient, String functionName, String invocationType) throws IOException;
}
Loading

0 comments on commit e311f0d

Please sign in to comment.