Skip to content

Commit

Permalink
Address race condition in lambda processor and threading issues in la…
Browse files Browse the repository at this point in the history
…mbda sink

Signed-off-by: Srikanth Govindarajan <[email protected]>
  • Loading branch information
srikanthjg committed Nov 15, 2024
1 parent 62110de commit 430af31
Show file tree
Hide file tree
Showing 14 changed files with 380 additions and 402 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Target({ElementType.CONSTRUCTOR, ElementType.TYPE})
public @interface SingleThread {
}
Original file line number Diff line number Diff line change
@@ -1,43 +1,16 @@
package org.opensearch.dataprepper.plugins.lambda.common;

import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory;
import org.slf4j.Logger;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class LambdaCommonHandler {
private final Logger LOG;
private final LambdaAsyncClient lambdaAsyncClient;
private final String functionName;
private final String invocationType;
BufferFactory bufferFactory;
private static final Logger LOG = LoggerFactory.getLogger(LambdaCommonHandler.class);

public LambdaCommonHandler(
final Logger log,
final LambdaAsyncClient lambdaAsyncClient,
final String functionName,
final String invocationType){
this.LOG = log;
this.lambdaAsyncClient = lambdaAsyncClient;
this.functionName = functionName;
this.invocationType = invocationType;
}

public Buffer createBuffer(BufferFactory bufferFactory) {
try {
LOG.debug("Resetting buffer");
return bufferFactory.getBuffer(lambdaAsyncClient, functionName, invocationType);
} catch (IOException e) {
throw new RuntimeException("Failed to reset buffer", e);
}
}

public boolean checkStatusCode(InvokeResponse response) {
public static boolean checkStatusCode(InvokeResponse response) {
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
LOG.error("Lambda invocation returned with non-success status code: {}", statusCode);
Expand All @@ -46,7 +19,7 @@ public boolean checkStatusCode(InvokeResponse response) {
return true;
}

public void waitForFutures(List<CompletableFuture<Void>> futureList) {
public static void waitForFutures(List<CompletableFuture<Void>> futureList) {
if (!futureList.isEmpty()) {
try {
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
Expand All @@ -58,4 +31,4 @@ public void waitForFutures(List<CompletableFuture<Void>> futureList) {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -16,7 +15,7 @@ public class AggregateResponseEventHandlingStrategy implements ResponseEventHand

@Override
public void handleEvents(List<Event> parsedEvents, List<Record<Event>> originalRecords,
List<Record<Event>> resultRecords, Buffer flushedBuffer) {
List<Record<Event>> resultRecords) {

Event originalEvent = originalRecords.get(0).getData();
DefaultEventHandle eventHandle = (DefaultEventHandle) originalEvent.getEventHandle();
Expand All @@ -32,5 +31,6 @@ public void handleEvents(List<Event> parsedEvents, List<Record<Event>> originalR
originalAcknowledgementSet.add(responseEvent);
}
}
LOG.info("Successfully handled {} events in Aggregate response strategy", parsedEvents.size());
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opensearch.dataprepper.plugins.lambda.processor;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.awssdk.core.SdkBytes;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

public class PayloadValidator {
private static final ObjectMapper objectMapper = new ObjectMapper();

public static InputStream validateAndGetInputStream(SdkBytes payload) throws IOException {
JsonNode jsonNode = objectMapper.readTree(payload.asByteArray());

if (!jsonNode.isArray()) {
throw new IllegalArgumentException("Payload must be a JSON array");
}

return new ByteArrayInputStream(payload.asByteArray());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;

import java.util.List;

public interface ResponseEventHandlingStrategy {
void handleEvents(List<Event> parsedEvents, List<Record<Event>> originalRecords, List<Record<Event>> resultRecords, Buffer flushedBuffer);
void handleEvents(List<Event> parsedEvents, List<Record<Event>> originalRecords,
List<Record<Event>> resultRecords);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,49 @@

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

public class StrictResponseEventHandlingStrategy implements ResponseEventHandlingStrategy {

private static final Logger LOG = LoggerFactory.getLogger(StrictResponseEventHandlingStrategy.class);

@Override
public void handleEvents(List<Event> parsedEvents, List<Record<Event>> originalRecords, List<Record<Event>> resultRecords, Buffer flushedBuffer) {
if (parsedEvents.size() != flushedBuffer.getEventCount()) {
throw new RuntimeException("Response Processing Mode is configured as Strict mode but behavior is aggregate mode. Event count mismatch.");
public void handleEvents(List<Event> parsedEvents, List<Record<Event>> originalRecords,
List<Record<Event>> resultRecords) {
if (parsedEvents.size() != originalRecords.size()) {
LOG.error("Strict response strategy - Event count mismatch: Parsed events size: {}, Original records size: {}",
parsedEvents.size(), originalRecords.size());
throw new RuntimeException("Event count mismatch. Response Processing Mode is configured as Strict mode but behavior is aggregate mode.");
}

for (int i = 0; i < parsedEvents.size(); i++) {
Event responseEvent = parsedEvents.get(i);
Event originalEvent = originalRecords.get(i).getData();
LOG.info("parseEvent size: {} , originalRecords size: {}", parsedEvents.size(),
originalRecords.size());
try {
for (int i = 0; i < parsedEvents.size(); i++) {

// Clear the original event's data
originalEvent.clear();
Event responseEvent = parsedEvents.get(i);
Event originalEvent = originalRecords.get(i).getData();

// Manually copy each key-value pair from the responseEvent to the originalEvent
Map<String, Object> responseData = responseEvent.toMap();
for (Map.Entry<String, Object> entry : responseData.entrySet()) {
originalEvent.put(entry.getKey(), entry.getValue());
}
// Clear the original event's data
originalEvent.clear();

// Add updated event to resultRecords
resultRecords.add(originalRecords.get(i));
// Manually copy each key-value pair from the responseEvent to the originalEvent
Map<String, Object> responseData = responseEvent.toMap();
for (Map.Entry<String, Object> entry : responseData.entrySet()) {
originalEvent.put(entry.getKey(), entry.getValue());
}

// Add updated event to resultRecords
resultRecords.add(originalRecords.get(i));
}
}catch (Exception e){
LOG.info("SRI ERRRRRRRRRROR",e);
}
LOG.info("Successfully handled {} events in Strict response strategy", parsedEvents.size());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ public class LambdaSinkConfig {
@JsonProperty("batch")
private BatchOptions batchOptions;

@JsonPropertyDescription("defines a condition for event to use this processor")
@JsonProperty("lambda_when")
private String whenCondition;

@JsonPropertyDescription("sdk timeout defines the time sdk maintains the connection to the client before timing out")
@JsonProperty("connection_timeout")
private Duration connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
Expand Down Expand Up @@ -99,8 +95,4 @@ public InvocationType getInvocationType() {
return invocationType;
}

public String getWhenCondition() {
return whenCondition;
}

}
Loading

0 comments on commit 430af31

Please sign in to comment.