diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java index 001689bccf..8240943374 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java @@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; import static org.opensearch.dataprepper.plugins.lambda.common.LambdaCommonHandler.isSuccess; @DataPrepperPlugin(name = "aws_lambda", pluginType = Sink.class, pluginConfigurationType = LambdaSinkConfig.class) @@ -172,19 +173,21 @@ public void doOutput(final Collection> records) { Duration latency = inputBuffer.stopLatencyWatch(); lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS); requestPayloadMetric.record(inputBuffer.getPayloadRequestSize()); - if (isSuccess(response)) { - releaseEventHandlesPerBatch(true, inputBuffer.getRecords()); - numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount()); - numberOfRequestsSuccessCounter.increment(); - if (response.payload() != null) { - responsePayloadMetric.record(response.payload().asByteArray().length); - } - } else { - LOG.error("Lambda invoke failed with error {} ", response.statusCode()); - handleFailure(new RuntimeException("failed"), inputBuffer); + if (!isSuccess(response)) { + String errorMessage = String.format("Lambda invoke failed with status code %s error %s ", + response.statusCode(), response.payload().asUtf8String()); + throw new RuntimeException(errorMessage); } + + releaseEventHandlesPerBatch(true, inputBuffer.getRecords()); + numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount()); + numberOfRequestsSuccessCounter.increment(); + if (response.payload() != null) { + responsePayloadMetric.record(response.payload().asByteArray().length); + } + } catch (Exception e) { - LOG.error("Exception from Lambda invocation ", e); + LOG.error(NOISY, e.getMessage(), e); numberOfRecordsFailedCounter.increment(inputBuffer.getEventCount()); numberOfRequestsFailedCounter.increment(); handleFailure(new RuntimeException("failed"), inputBuffer);