Skip to content

Commit

Permalink
Indentation changes
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Nov 16, 2024
1 parent db88ef9 commit dfe8f0b
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,40 +42,40 @@ public class LambdaCommonHandler {
private final LambdaAsyncClient lambdaAsyncClient;
private final String functionName;
private final String invocationType;
private final LambdaCommonConfig config;
private final String whenCondition;
private final LambdaCommonConfig config;
private final String whenCondition;
BufferFactory bufferFactory;
final InputCodec responseCodec;
final ExpressionEvaluator expressionEvaluator;
JsonOutputCodecConfig jsonOutputCodecConfig;
final InputCodec responseCodec;
final ExpressionEvaluator expressionEvaluator;
JsonOutputCodecConfig jsonOutputCodecConfig;
private final int maxEvents;
private final ByteCount maxBytes;
private final Duration maxCollectionDuration;
private final ResponseEventHandlingStrategy responseStrategy;

public LambdaCommonHandler(final Logger log,
final LambdaAsyncClient lambdaAsyncClient,
final JsonOutputCodecConfig jsonOutputCodecConfig,
final InputCodec responseCodec,
final String whenCondition,
final ExpressionEvaluator expressionEvaluator,
final ResponseEventHandlingStrategy responseStrategy,
final LambdaCommonConfig lambdaCommonConfig) {
this.LOG = log;
public LambdaCommonHandler(final Logger log,
final LambdaAsyncClient lambdaAsyncClient,
final JsonOutputCodecConfig jsonOutputCodecConfig,
final InputCodec responseCodec,
final String whenCondition,
final ExpressionEvaluator expressionEvaluator,
final ResponseEventHandlingStrategy responseStrategy,
final LambdaCommonConfig lambdaCommonConfig) {
this.LOG = log;
this.lambdaAsyncClient = lambdaAsyncClient;
this.responseStrategy = responseStrategy;
this.config = lambdaCommonConfig;
this.jsonOutputCodecConfig = jsonOutputCodecConfig;
this.whenCondition = whenCondition;
this.responseCodec = responseCodec;
this.expressionEvaluator = expressionEvaluator;
this.responseStrategy = responseStrategy;
this.config = lambdaCommonConfig;
this.jsonOutputCodecConfig = jsonOutputCodecConfig;
this.whenCondition = whenCondition;
this.responseCodec = responseCodec;
this.expressionEvaluator = expressionEvaluator;
this.functionName = config.getFunctionName();
this.invocationType = config.getInvocationType().getAwsLambdaValue();
maxEvents = lambdaCommonConfig.getBatchOptions().getThresholdOptions().getEventCount();
maxBytes = lambdaCommonConfig.getBatchOptions().getThresholdOptions().getMaximumSize();
maxCollectionDuration = lambdaCommonConfig.getBatchOptions().getThresholdOptions().getEventCollectTimeOut();
bufferFactory = new InMemoryBufferFactory();
}
}

public LambdaCommonHandler(final Logger log,
final LambdaAsyncClient lambdaAsyncClient,
Expand Down Expand Up @@ -115,51 +115,50 @@ public void waitForFutures(List<CompletableFuture<Void>> futureList) {
}
}

public List<Record<Event>> sendRecords(Collection<Record<Event>> records,
BiConsumer<Buffer, List<Record<Event>>> successHandler, BiConsumer<Buffer, List<Record<Event>>> failureHandler) {
List<Record<Event>> resultRecords = Collections.synchronizedList(new ArrayList());
boolean createNewBuffer = true;
Buffer currentBufferPerBatch = null;
OutputCodec requestCodec = null;
public List<Record<Event>> sendRecords(Collection<Record<Event>> records,
BiConsumer<Buffer, List<Record<Event>>> successHandler, BiConsumer<Buffer, List<Record<Event>>> failureHandler) {
List<Record<Event>> resultRecords = Collections.synchronizedList(new ArrayList());
boolean createNewBuffer = true;
Buffer currentBufferPerBatch = null;
OutputCodec requestCodec = null;
List futureList = new ArrayList<>();
for (Record<Event> record : records) {
final Event event = record.getData();

// If the condition is false, add the event to resultRecords as-is
if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) {
synchronized(resultRecords) {
resultRecords.add(record);
}
synchronized(resultRecords) {
resultRecords.add(record);
}
continue;
}

try {
if (createNewBuffer) {
currentBufferPerBatch = createBuffer(bufferFactory);
requestCodec = new JsonOutputCodec(jsonOutputCodecConfig);
requestCodec.start(currentBufferPerBatch.getOutputStream(), event, new OutputCodecContext());
}
requestCodec.writeEvent(event, currentBufferPerBatch.getOutputStream());
} catch (IOException ex) {
LOG.error("Failed to start or write to request codec");
break;
}
currentBufferPerBatch.addRecord(record);
createNewBuffer = flushToLambdaIfNeeded(resultRecords, currentBufferPerBatch,
requestCodec, futureList, successHandler, failureHandler, false);
}
try {
if (createNewBuffer) {
currentBufferPerBatch = createBuffer(bufferFactory);
requestCodec = new JsonOutputCodec(jsonOutputCodecConfig);
requestCodec.start(currentBufferPerBatch.getOutputStream(), event, new OutputCodecContext());
}
requestCodec.writeEvent(event, currentBufferPerBatch.getOutputStream());
} catch (IOException ex) {
LOG.error("Failed to start or write to request codec");
break;
}
currentBufferPerBatch.addRecord(record);
createNewBuffer = flushToLambdaIfNeeded(resultRecords, currentBufferPerBatch,
requestCodec, futureList, successHandler, failureHandler, false);
}
if (!createNewBuffer && currentBufferPerBatch.getEventCount() > 0) {
flushToLambdaIfNeeded(resultRecords, currentBufferPerBatch,
requestCodec, futureList, successHandler, failureHandler, true);
}
flushToLambdaIfNeeded(resultRecords, currentBufferPerBatch,
requestCodec, futureList, successHandler, failureHandler, true);
}
waitForFutures(futureList);
return resultRecords;
}
}

boolean flushToLambdaIfNeeded(List<Record<Event>> resultRecords, Buffer currentBufferPerBatch,
OutputCodec requestCodec, List futureList,
BiConsumer<Buffer, List<Record<Event>>> successHandler, BiConsumer<Buffer, List<Record<Event>>> failureHandler,
boolean forceFlush) {
OutputCodec requestCodec, List futureList, BiConsumer<Buffer, List<Record<Event>>> successHandler,
BiConsumer<Buffer, List<Record<Event>>> failureHandler, boolean forceFlush) {

LOG.debug("currentBufferPerBatchEventCount:{}, maxEvents:{}, maxBytes:{}, " +
"maxCollectionDuration:{}, forceFlush:{} ", currentBufferPerBatch.getEventCount(),
Expand Down Expand Up @@ -202,8 +201,8 @@ boolean flushToLambdaIfNeeded(List<Record<Event>> resultRecords, Buffer currentB
}

private void handleLambdaResponse(List<Record<Event>> resultRecords, Buffer flushedBuffer,
int eventCount, InvokeResponse response,
BiConsumer<Buffer, List<Record<Event>>> successHandler, BiConsumer<Buffer, List<Record<Event>>> failureHandler) {
int eventCount, InvokeResponse response, BiConsumer<Buffer, List<Record<Event>>> successHandler,
BiConsumer<Buffer, List<Record<Event>>> failureHandler) {
boolean success = checkStatusCode(response);
if (success) {
LOG.info("Successfully flushed {} events", eventCount);
Expand Down Expand Up @@ -259,10 +258,10 @@ void convertLambdaResponseToEvent(final List<Record<Event>> resultRecords, final
LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, " +
"FlushedBuffer size:{}", parsedEvents.size(), flushedBuffer.getEventCount(),
flushedBuffer.getSize());
synchronized(resultRecords) {
responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords, flushedBuffer);
successHandler.accept(flushedBuffer, originalRecords);
}
synchronized(resultRecords) {
responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords, flushedBuffer);
successHandler.accept(flushedBuffer, originalRecords);
}
}
} catch (Exception e) {
LOG.error(NOISY, "Error converting Lambda response to Event");
Expand All @@ -282,11 +281,10 @@ void handleFailure(Throwable e, Buffer flushedBuffer, List<Record<Event>> result
if (flushedBuffer.getEventCount() > 0) {
//numberOfRecordsFailedCounter.increment(flushedBuffer.getEventCount());
}
synchronized(resultRecords) {
failureHandler.accept(flushedBuffer, resultRecords);
}
synchronized(resultRecords) {
failureHandler.accept(flushedBuffer, resultRecords);
}

//addFailureTags(flushedBuffer, resultRecords);
LOG.error(NOISY, "Failed to process batch due to error: ", e);
} catch(Exception ex){
LOG.error(NOISY, "Exception in handleFailure while processing failure for buffer: ", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class LambdaProcessor extends AbstractProcessor<Record<Event>, Record<Eve
LambdaCommonHandler lambdaCommonHandler;
final PluginSetting codecPluginSetting;
final PluginFactory pluginFactory;
final LambdaProcessorConfig lambdaProcessorConfig;
final LambdaProcessorConfig lambdaProcessorConfig;
private final ResponseEventHandlingStrategy responseStrategy;
private final JsonOutputCodecConfig jsonOutputCodecConfig;

Expand All @@ -70,7 +70,7 @@ public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pl
super(pluginMetrics);
this.expressionEvaluator = expressionEvaluator;
this.pluginFactory = pluginFactory;
this.lambdaProcessorConfig = lambdaProcessorConfig;
this.lambdaProcessorConfig = lambdaProcessorConfig;
this.numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS);
this.numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED);
this.lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC);
Expand Down Expand Up @@ -114,8 +114,8 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

InputCodec responseCodec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSetting);
lambdaCommonHandler = new LambdaCommonHandler(LOG, lambdaAsyncClient, jsonOutputCodecConfig, responseCodec, whenCondition, expressionEvaluator, responseStrategy, lambdaProcessorConfig);
return lambdaCommonHandler.sendRecords(records, (inputBuffer, resultRecords)->{}, (inputBuffer, resultRecords)->{ addFailureTags(inputBuffer, resultRecords);});
}
return lambdaCommonHandler.sendRecords(records, (inputBuffer, resultRecords)->{}, (inputBuffer, resultRecords)->{ addFailureTags(inputBuffer, resultRecords);});
}

private void addFailureTags(Buffer flushedBuffer, List<Record<Event>> resultRecords) {
// Add failure tags to each event in the batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class LambdaProcessorConfigTest {
private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));

@Test
void test_defaults() {
void test_defaults() {
final LambdaProcessorConfig lambdaProcessorConfig = new LambdaProcessorConfig();
assertThat(lambdaProcessorConfig.getTagsOnMatchFailure(), equalTo(List.of()));
assertThat(lambdaProcessorConfig.getWhenCondition(), equalTo(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void setUp() throws Exception {
when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.ofSeconds(30));
when(batchOptions.getKeyName()).thenReturn("key");

// Mock Response Codec Configuration
// Mock Response Codec Configuration
PluginModel responseCodecConfig = lambdaProcessorConfig.getResponseCodecConfig();
PluginSetting responseCodecPluginSetting;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,12 @@ void lambda_sink_pipeline_config_test_with_no_dlq() throws JsonProcessingExcepti
" max_retries: 10\n";
final LambdaSinkConfig lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class);
assertThat(lambdaSinkConfig.getDlq(),equalTo(null));
/*
assertThat(lambdaSinkConfig.getMaxConnectionRetries(),equalTo(10));
assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion(),equalTo(Region.AP_SOUTH_1));
assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(),equalTo("arn:aws:iam::524239988912:role/app-test"));
assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsStsHeaderOverrides().get("test"),equalTo("test"));
assertThat(lambdaSinkConfig.getDlqStsRegion(),equalTo("ap-south-1"));
assertThat(lambdaSinkConfig.getDlqStsRoleARN(),equalTo("arn:aws:iam::524239988912:role/app-test"));
assertThat(lambdaSinkConfig.getDlqPluginSetting().get("key"),equalTo(null));
*/
}
}

0 comments on commit dfe8f0b

Please sign in to comment.