Skip to content

Commit

Permalink
refactored to remove handlers passing (opensearch-project#5202)
Browse files Browse the repository at this point in the history
* refactored to remove handlers passing

Signed-off-by: Santhosh Gandhe <[email protected]>

* spacing adjusted

Signed-off-by: Santhosh Gandhe <[email protected]>

---------

Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 authored Nov 18, 2024
1 parent e0dee50 commit 8645c1e
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
Expand All @@ -33,18 +30,16 @@
public class LambdaCommonHandler {

private static final Logger LOG = LoggerFactory.getLogger(LambdaCommonHandler.class);

private LambdaCommonHandler() {
}

public static boolean isSuccess(InvokeResponse response) {
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
return false;
}
return true;
return statusCode >= 200 && statusCode < 300;
}

public static void waitForFutures(List<CompletableFuture<InvokeResponse>> futureList) {
public static void waitForFutures(Collection<CompletableFuture<InvokeResponse>> futureList) {

if (!futureList.isEmpty()) {
try {
Expand Down Expand Up @@ -83,50 +78,25 @@ private static List<Buffer> createBufferBatches(Collection<Record<Event>> record
return batchedBuffers;
}

public static List<Record<Event>> sendRecords(Collection<Record<Event>> records,
public static Map<Buffer, CompletableFuture<InvokeResponse>> sendRecords(
Collection<Record<Event>> records,
LambdaCommonConfig config,
LambdaAsyncClient lambdaAsyncClient,
final OutputCodecContext outputCodecContext,
BiFunction<Buffer, InvokeResponse, List<Record<Event>>> successHandler,
Function<Buffer, List<Record<Event>>> failureHandler) {
// Initialize here to void multi-threading issues
// Note: By default, one instance of processor is created across threads.
//List<Record<Event>> resultRecords = Collections.synchronizedList(new ArrayList<>());
List<Record<Event>> resultRecords = new ArrayList<>();
List<CompletableFuture<InvokeResponse>> futureList = new ArrayList<>();
int totalFlushedEvents = 0;
final OutputCodecContext outputCodecContext) {

List<Buffer> batchedBuffers = createBufferBatches(records, config.getBatchOptions(),
outputCodecContext);

Map<Buffer, CompletableFuture> bufferToFutureMap = new HashMap<>();
Map<Buffer, CompletableFuture<InvokeResponse>> bufferToFutureMap = new HashMap<>();
LOG.debug("Batch Chunks created after threshold check: {}", batchedBuffers.size());
for (Buffer buffer : batchedBuffers) {
InvokeRequest requestPayload = buffer.getRequestPayload(config.getFunctionName(),
config.getInvocationType().getAwsLambdaValue());
config.getInvocationType().getAwsLambdaValue());
CompletableFuture<InvokeResponse> future = lambdaAsyncClient.invoke(requestPayload);
futureList.add(future);
bufferToFutureMap.put(buffer, future);
}
waitForFutures(futureList);
for (Map.Entry<Buffer, CompletableFuture> entry : bufferToFutureMap.entrySet()) {
CompletableFuture future = entry.getValue();
Buffer buffer = entry.getKey();
try {
InvokeResponse response = (InvokeResponse) future.join();
if (isSuccess(response)) {
resultRecords.addAll(successHandler.apply(buffer, response));
} else {
LOG.error("Lambda invoke failed with error {} ", response.statusCode());
resultRecords.addAll(failureHandler.apply(buffer));
}
} catch (Exception e) {
LOG.error("Exception from Lambda invocation ", e);
resultRecords.addAll(failureHandler.apply(buffer));
}
}
return resultRecords;

waitForFutures(bufferToFutureMap.values());
return bufferToFutureMap;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.lambda.processor;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import static org.opensearch.dataprepper.plugins.lambda.common.LambdaCommonHandler.isSuccess;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
Expand All @@ -18,6 +19,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
Expand Down Expand Up @@ -74,25 +77,24 @@ public class LambdaProcessor extends AbstractProcessor<Record<Event>, Record<Eve
private final DistributionSummary responsePayloadMetric;
private final ResponseEventHandlingStrategy responseStrategy;
private final JsonOutputCodecConfig jsonOutputCodecConfig;
LambdaCommonHandler lambdaCommonHandler;

@DataPrepperPluginConstructor
public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pluginMetrics,
final LambdaProcessorConfig lambdaProcessorConfig,
final AwsCredentialsSupplier awsCredentialsSupplier,
final ExpressionEvaluator expressionEvaluator) {
final LambdaProcessorConfig lambdaProcessorConfig,
final AwsCredentialsSupplier awsCredentialsSupplier,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.expressionEvaluator = expressionEvaluator;
this.pluginFactory = pluginFactory;
this.lambdaProcessorConfig = lambdaProcessorConfig;
this.numberOfRecordsSuccessCounter = pluginMetrics.counter(
NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS);
NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS);
this.numberOfRecordsFailedCounter = pluginMetrics.counter(
NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED);
NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED);
this.numberOfRequestsSuccessCounter = pluginMetrics.counter(
NUMBER_OF_SUCCESSFUL_REQUESTS_TO_LAMBDA);
NUMBER_OF_SUCCESSFUL_REQUESTS_TO_LAMBDA);
this.numberOfRequestsFailedCounter = pluginMetrics.counter(
NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA);
NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA);
this.lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC);
this.requestPayloadMetric = pluginMetrics.summary(REQUEST_PAYLOAD_SIZE);
this.responsePayloadMetric = pluginMetrics.summary(RESPONSE_PAYLOAD_SIZE);
Expand All @@ -106,20 +108,20 @@ public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pl
codecPluginSetting = new PluginSetting("json", Collections.emptyMap());
} else {
codecPluginSetting = new PluginSetting(responseCodecConfig.getPluginName(),
responseCodecConfig.getPluginSettings());
responseCodecConfig.getPluginSettings());
}

jsonOutputCodecConfig = new JsonOutputCodecConfig();
jsonOutputCodecConfig.setKeyName(lambdaProcessorConfig.getBatchOptions().getKeyName());

ClientOptions clientOptions = lambdaProcessorConfig.getClientOptions();
if(clientOptions == null){
if (clientOptions == null) {
clientOptions = new ClientOptions();
}
lambdaAsyncClient = LambdaClientFactory.createAsyncLambdaClient(
lambdaProcessorConfig.getAwsAuthenticationOptions(),
awsCredentialsSupplier,
clientOptions
lambdaProcessorConfig.getAwsAuthenticationOptions(),
awsCredentialsSupplier,
clientOptions
);

// Select the correct strategy based on the configuration
Expand All @@ -137,41 +139,44 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
return records;
}

List<Record<Event>> resultRecords = Collections.synchronizedList(new ArrayList());
List<Record<Event>> resultRecords = new ArrayList<>();
List<Record<Event>> recordsToLambda = new ArrayList<>();
for (Record<Event> record : records) {
final Event event = record.getData();
// If the condition is false, add the event to resultRecords as-is
if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) {
if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition,
event)) {
resultRecords.add(record);
continue;
}
recordsToLambda.add(record);
}
try {
resultRecords.addAll(
lambdaCommonHandler.sendRecords(recordsToLambda, lambdaProcessorConfig, lambdaAsyncClient,
new OutputCodecContext(),
(inputBuffer, response) -> {
Duration latency = inputBuffer.stopLatencyWatch();
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount());
numberOfRequestsSuccessCounter.increment();
return convertLambdaResponseToEvent(inputBuffer, response);
},
(inputBuffer) -> {
Duration latency = inputBuffer.stopLatencyWatch();
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
numberOfRecordsFailedCounter.increment(inputBuffer.getEventCount());
numberOfRequestsFailedCounter.increment();
return addFailureTags(inputBuffer.getRecords());
})

);
} catch (Exception e) {
LOG.info("Exception in doExecute");
numberOfRecordsFailedCounter.increment(recordsToLambda.size());
resultRecords.addAll(addFailureTags(recordsToLambda));

Map<Buffer, CompletableFuture<InvokeResponse>> bufferToFutureMap = LambdaCommonHandler.sendRecords(
recordsToLambda, lambdaProcessorConfig, lambdaAsyncClient,
new OutputCodecContext());

for (Map.Entry<Buffer, CompletableFuture<InvokeResponse>> entry : bufferToFutureMap.entrySet()) {
CompletableFuture<InvokeResponse> future = entry.getValue();
Buffer inputBuffer = entry.getKey();
try {
InvokeResponse response = future.join();
Duration latency = inputBuffer.stopLatencyWatch();
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
if (isSuccess(response)) {
numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount());
numberOfRequestsSuccessCounter.increment();
resultRecords.addAll(convertLambdaResponseToEvent(inputBuffer, response));
} else {
LOG.error("Lambda invoke failed with error {} ", response.statusCode());
resultRecords.addAll(addFailureTags(inputBuffer.getRecords()));
}
} catch (Exception e) {
LOG.error("Exception from Lambda invocation ", e);
numberOfRecordsFailedCounter.increment(inputBuffer.getEventCount());
numberOfRequestsFailedCounter.increment();
resultRecords.addAll(addFailureTags(inputBuffer.getRecords()));
}
}
return resultRecords;
}
Expand All @@ -182,7 +187,7 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
* 2. If it is not an array, then create one event per response.
*/
List<Record<Event>> convertLambdaResponseToEvent(Buffer flushedBuffer,
final InvokeResponse lambdaResponse) {
final InvokeResponse lambdaResponse) {
InputCodec responseCodec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSetting);
List<Record<Event>> originalRecords = flushedBuffer.getRecords();
try {
Expand All @@ -191,8 +196,10 @@ List<Record<Event>> convertLambdaResponseToEvent(Buffer flushedBuffer,
List<Record<Event>> resultRecords = new ArrayList<>();
SdkBytes payload = lambdaResponse.payload();
// Handle null or empty payload
if (payload == null || payload.asByteArray() == null || payload.asByteArray().length == 0) {
LOG.warn(NOISY, "Lambda response payload is null or empty, dropping the original events");
if (payload == null || payload.asByteArray() == null
|| payload.asByteArray().length == 0) {
LOG.warn(NOISY,
"Lambda response payload is null or empty, dropping the original events");
} else {
InputStream inputStream = new ByteArrayInputStream(payload.asByteArray());
//Convert to response codec
Expand All @@ -206,9 +213,10 @@ List<Record<Event>> convertLambdaResponseToEvent(Buffer flushedBuffer,
}

LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, " +
"FlushedBuffer size:{}", parsedEvents.size(), flushedBuffer.getEventCount(),
flushedBuffer.getSize());
responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords, flushedBuffer);
"FlushedBuffer size:{}", parsedEvents.size(), flushedBuffer.getEventCount(),
flushedBuffer.getSize());
responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords,
flushedBuffer);
}
return resultRecords;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

package org.opensearch.dataprepper.plugins.lambda.sink;

import static org.opensearch.dataprepper.plugins.lambda.common.LambdaCommonHandler.isSuccess;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
Expand All @@ -37,6 +40,7 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;

@DataPrepperPlugin(name = "aws_lambda", pluginType = Sink.class, pluginConfigurationType = LambdaSinkConfig.class)
public class LambdaSink extends AbstractSink<Record<Event>> {
Expand Down Expand Up @@ -94,7 +98,7 @@ public LambdaSink(final PluginSetting pluginSetting,
this.requestPayloadMetric = pluginMetrics.summary(REQUEST_PAYLOAD_SIZE);
this.responsePayloadMetric = pluginMetrics.summary(RESPONSE_PAYLOAD_SIZE);
ClientOptions clientOptions = lambdaSinkConfig.getClientOptions();
if(clientOptions == null){
if (clientOptions == null) {
clientOptions = new ClientOptions();
}
this.lambdaAsyncClient = LambdaClientFactory.createAsyncLambdaClient(
Expand Down Expand Up @@ -147,28 +151,37 @@ public void doOutput(final Collection<Record<Event>> records) {
}

//Result from lambda is not currently processes.
LambdaCommonHandler.sendRecords(records,
Map<Buffer, CompletableFuture<InvokeResponse>> bufferToFutureMap = LambdaCommonHandler.sendRecords(
records,
lambdaSinkConfig,
lambdaAsyncClient,
outputCodecContext,
(inputBuffer, invokeResponse) -> {
Duration latency = inputBuffer.stopLatencyWatch();
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
outputCodecContext);

for (Map.Entry<Buffer, CompletableFuture<InvokeResponse>> entry : bufferToFutureMap.entrySet()) {
CompletableFuture<InvokeResponse> future = entry.getValue();
Buffer inputBuffer = entry.getKey();
try {
InvokeResponse response = future.join();
Duration latency = inputBuffer.stopLatencyWatch();
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
if (isSuccess(response)) {
numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount());
numberOfRequestsSuccessCounter.increment();
releaseEventHandlesPerBatch(true, inputBuffer);
return new ArrayList<>();
},
(inputBuffer) -> {
Duration latency = inputBuffer.stopLatencyWatch();
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
numberOfRecordsFailedCounter.increment(inputBuffer.getEventCount());
numberOfRequestsFailedCounter.increment();
} else {
LOG.error("Lambda invoke failed with error {} ", response.statusCode());
handleFailure(new RuntimeException("failed"), inputBuffer);
return new ArrayList<>();
});
}
} catch (Exception e) {
LOG.error("Exception from Lambda invocation ", e);
numberOfRecordsFailedCounter.increment(inputBuffer.getEventCount());
numberOfRequestsFailedCounter.increment();
handleFailure(new RuntimeException("failed"), inputBuffer);
}
}
}


void handleFailure(Throwable throwable, Buffer flushedBuffer) {
try {
if (flushedBuffer.getEventCount() > 0) {
Expand Down
Loading

0 comments on commit 8645c1e

Please sign in to comment.