diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java index a39581be6b..5b65067543 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java @@ -42,40 +42,40 @@ public class LambdaCommonHandler { private final LambdaAsyncClient lambdaAsyncClient; private final String functionName; private final String invocationType; - private final LambdaCommonConfig config; - private final String whenCondition; + private final LambdaCommonConfig config; + private final String whenCondition; BufferFactory bufferFactory; - final InputCodec responseCodec; - final ExpressionEvaluator expressionEvaluator; - JsonOutputCodecConfig jsonOutputCodecConfig; + final InputCodec responseCodec; + final ExpressionEvaluator expressionEvaluator; + JsonOutputCodecConfig jsonOutputCodecConfig; private final int maxEvents; private final ByteCount maxBytes; private final Duration maxCollectionDuration; private final ResponseEventHandlingStrategy responseStrategy; - public LambdaCommonHandler(final Logger log, - final LambdaAsyncClient lambdaAsyncClient, - final JsonOutputCodecConfig jsonOutputCodecConfig, - final InputCodec responseCodec, - final String whenCondition, - final ExpressionEvaluator expressionEvaluator, - final ResponseEventHandlingStrategy responseStrategy, - final LambdaCommonConfig lambdaCommonConfig) { - this.LOG = log; + public LambdaCommonHandler(final Logger log, + final LambdaAsyncClient lambdaAsyncClient, + final JsonOutputCodecConfig jsonOutputCodecConfig, + final InputCodec responseCodec, + final String whenCondition, + final ExpressionEvaluator expressionEvaluator, + final ResponseEventHandlingStrategy responseStrategy, + final LambdaCommonConfig lambdaCommonConfig) { + this.LOG = log; this.lambdaAsyncClient = lambdaAsyncClient; - this.responseStrategy = responseStrategy; - this.config = lambdaCommonConfig; - this.jsonOutputCodecConfig = jsonOutputCodecConfig; - this.whenCondition = whenCondition; - this.responseCodec = responseCodec; - this.expressionEvaluator = expressionEvaluator; + this.responseStrategy = responseStrategy; + this.config = lambdaCommonConfig; + this.jsonOutputCodecConfig = jsonOutputCodecConfig; + this.whenCondition = whenCondition; + this.responseCodec = responseCodec; + this.expressionEvaluator = expressionEvaluator; this.functionName = config.getFunctionName(); this.invocationType = config.getInvocationType().getAwsLambdaValue(); maxEvents = lambdaCommonConfig.getBatchOptions().getThresholdOptions().getEventCount(); maxBytes = lambdaCommonConfig.getBatchOptions().getThresholdOptions().getMaximumSize(); maxCollectionDuration = lambdaCommonConfig.getBatchOptions().getThresholdOptions().getEventCollectTimeOut(); bufferFactory = new InMemoryBufferFactory(); - } + } public LambdaCommonHandler(final Logger log, final LambdaAsyncClient lambdaAsyncClient, @@ -115,51 +115,50 @@ public void waitForFutures(List> futureList) { } } - public List> sendRecords(Collection> records, - BiConsumer>> successHandler, BiConsumer>> failureHandler) { - List> resultRecords = Collections.synchronizedList(new ArrayList()); - boolean createNewBuffer = true; - Buffer currentBufferPerBatch = null; - OutputCodec requestCodec = null; + public List> sendRecords(Collection> records, + BiConsumer>> successHandler, BiConsumer>> failureHandler) { + List> resultRecords = Collections.synchronizedList(new ArrayList()); + boolean createNewBuffer = true; + Buffer currentBufferPerBatch = null; + OutputCodec requestCodec = null; List futureList = new ArrayList<>(); for (Record record : records) { final Event event = record.getData(); // If the condition is false, add the event to resultRecords as-is if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) { - synchronized(resultRecords) { - resultRecords.add(record); - } + synchronized(resultRecords) { + resultRecords.add(record); + } continue; } - try { - if (createNewBuffer) { - currentBufferPerBatch = createBuffer(bufferFactory); - requestCodec = new JsonOutputCodec(jsonOutputCodecConfig); - requestCodec.start(currentBufferPerBatch.getOutputStream(), event, new OutputCodecContext()); - } - requestCodec.writeEvent(event, currentBufferPerBatch.getOutputStream()); - } catch (IOException ex) { - LOG.error("Failed to start or write to request codec"); - break; - } - currentBufferPerBatch.addRecord(record); - createNewBuffer = flushToLambdaIfNeeded(resultRecords, currentBufferPerBatch, - requestCodec, futureList, successHandler, failureHandler, false); - } + try { + if (createNewBuffer) { + currentBufferPerBatch = createBuffer(bufferFactory); + requestCodec = new JsonOutputCodec(jsonOutputCodecConfig); + requestCodec.start(currentBufferPerBatch.getOutputStream(), event, new OutputCodecContext()); + } + requestCodec.writeEvent(event, currentBufferPerBatch.getOutputStream()); + } catch (IOException ex) { + LOG.error("Failed to start or write to request codec"); + break; + } + currentBufferPerBatch.addRecord(record); + createNewBuffer = flushToLambdaIfNeeded(resultRecords, currentBufferPerBatch, + requestCodec, futureList, successHandler, failureHandler, false); + } if (!createNewBuffer && currentBufferPerBatch.getEventCount() > 0) { - flushToLambdaIfNeeded(resultRecords, currentBufferPerBatch, - requestCodec, futureList, successHandler, failureHandler, true); - } + flushToLambdaIfNeeded(resultRecords, currentBufferPerBatch, + requestCodec, futureList, successHandler, failureHandler, true); + } waitForFutures(futureList); return resultRecords; - } + } boolean flushToLambdaIfNeeded(List> resultRecords, Buffer currentBufferPerBatch, - OutputCodec requestCodec, List futureList, - BiConsumer>> successHandler, BiConsumer>> failureHandler, - boolean forceFlush) { + OutputCodec requestCodec, List futureList, BiConsumer>> successHandler, + BiConsumer>> failureHandler, boolean forceFlush) { LOG.debug("currentBufferPerBatchEventCount:{}, maxEvents:{}, maxBytes:{}, " + "maxCollectionDuration:{}, forceFlush:{} ", currentBufferPerBatch.getEventCount(), @@ -202,8 +201,8 @@ boolean flushToLambdaIfNeeded(List> resultRecords, Buffer currentB } private void handleLambdaResponse(List> resultRecords, Buffer flushedBuffer, - int eventCount, InvokeResponse response, - BiConsumer>> successHandler, BiConsumer>> failureHandler) { + int eventCount, InvokeResponse response, BiConsumer>> successHandler, + BiConsumer>> failureHandler) { boolean success = checkStatusCode(response); if (success) { LOG.info("Successfully flushed {} events", eventCount); @@ -259,10 +258,10 @@ void convertLambdaResponseToEvent(final List> resultRecords, final LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, " + "FlushedBuffer size:{}", parsedEvents.size(), flushedBuffer.getEventCount(), flushedBuffer.getSize()); - synchronized(resultRecords) { - responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords, flushedBuffer); - successHandler.accept(flushedBuffer, originalRecords); - } + synchronized(resultRecords) { + responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords, flushedBuffer); + successHandler.accept(flushedBuffer, originalRecords); + } } } catch (Exception e) { LOG.error(NOISY, "Error converting Lambda response to Event"); @@ -282,11 +281,10 @@ void handleFailure(Throwable e, Buffer flushedBuffer, List> result if (flushedBuffer.getEventCount() > 0) { //numberOfRecordsFailedCounter.increment(flushedBuffer.getEventCount()); } - synchronized(resultRecords) { - failureHandler.accept(flushedBuffer, resultRecords); - } + synchronized(resultRecords) { + failureHandler.accept(flushedBuffer, resultRecords); + } - //addFailureTags(flushedBuffer, resultRecords); LOG.error(NOISY, "Failed to process batch due to error: ", e); } catch(Exception ex){ LOG.error(NOISY, "Exception in handleFailure while processing failure for buffer: ", ex); diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java index 6b8c422dd3..275373dc1c 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java @@ -61,7 +61,7 @@ public class LambdaProcessor extends AbstractProcessor, Record> doExecute(Collection> records) { InputCodec responseCodec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSetting); lambdaCommonHandler = new LambdaCommonHandler(LOG, lambdaAsyncClient, jsonOutputCodecConfig, responseCodec, whenCondition, expressionEvaluator, responseStrategy, lambdaProcessorConfig); - return lambdaCommonHandler.sendRecords(records, (inputBuffer, resultRecords)->{}, (inputBuffer, resultRecords)->{ addFailureTags(inputBuffer, resultRecords);}); - } + return lambdaCommonHandler.sendRecords(records, (inputBuffer, resultRecords)->{}, (inputBuffer, resultRecords)->{ addFailureTags(inputBuffer, resultRecords);}); + } private void addFailureTags(Buffer flushedBuffer, List> resultRecords) { // Add failure tags to each event in the batch diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfigTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfigTest.java index c51a0cdceb..c6d2202ccb 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfigTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfigTest.java @@ -24,7 +24,7 @@ public class LambdaProcessorConfigTest { private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); @Test - void test_defaults() { + void test_defaults() { final LambdaProcessorConfig lambdaProcessorConfig = new LambdaProcessorConfig(); assertThat(lambdaProcessorConfig.getTagsOnMatchFailure(), equalTo(List.of())); assertThat(lambdaProcessorConfig.getWhenCondition(), equalTo(null)); diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java index 0dd6c72130..69f9669d48 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java @@ -147,7 +147,7 @@ public void setUp() throws Exception { when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.ofSeconds(30)); when(batchOptions.getKeyName()).thenReturn("key"); - // Mock Response Codec Configuration + // Mock Response Codec Configuration PluginModel responseCodecConfig = lambdaProcessorConfig.getResponseCodecConfig(); PluginSetting responseCodecPluginSetting; diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java index 600c898318..1b83e5685b 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java @@ -60,7 +60,6 @@ void lambda_sink_pipeline_config_test_with_no_dlq() throws JsonProcessingExcepti " max_retries: 10\n"; final LambdaSinkConfig lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); assertThat(lambdaSinkConfig.getDlq(),equalTo(null)); -/* assertThat(lambdaSinkConfig.getMaxConnectionRetries(),equalTo(10)); assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion(),equalTo(Region.AP_SOUTH_1)); assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(),equalTo("arn:aws:iam::524239988912:role/app-test")); @@ -68,6 +67,5 @@ void lambda_sink_pipeline_config_test_with_no_dlq() throws JsonProcessingExcepti assertThat(lambdaSinkConfig.getDlqStsRegion(),equalTo("ap-south-1")); assertThat(lambdaSinkConfig.getDlqStsRoleARN(),equalTo("arn:aws:iam::524239988912:role/app-test")); assertThat(lambdaSinkConfig.getDlqPluginSetting().get("key"),equalTo(null)); -*/ } }