Skip to content

Commit

Permalink
InMemoryBuffer related test
Browse files Browse the repository at this point in the history
Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 committed Nov 17, 2024
1 parent 35a04aa commit 46698fd
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.dataprepper.plugins.lambda.common.accumlator;

import java.io.OutputStream;
import java.time.Duration;
import java.util.List;
import org.opensearch.dataprepper.model.event.Event;
Expand All @@ -22,14 +21,10 @@ public interface Buffer {

int getEventCount();

void setEventCount(int eventCount);

Duration getDuration();

InvokeRequest getRequestPayload(String functionName, String invocationType);

OutputStream getOutputStream();

SdkBytes getPayload();

void addRecord(Record<Event> record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -27,134 +26,127 @@
*/
public class InMemoryBuffer implements Buffer {

private final ByteArrayOutputStream byteArrayOutputStream;

private final List<Record<Event>> records;
private final StopWatch bufferWatch;
private final StopWatch lambdaLatencyWatch;
private final OutputCodec requestCodec;
private int eventCount;
private long payloadRequestSize;
private long payloadResponseSize;


public InMemoryBuffer(String batchOptionKeyName) {
byteArrayOutputStream = new ByteArrayOutputStream();
records = new ArrayList<>();
bufferWatch = new StopWatch();
bufferWatch.start();
lambdaLatencyWatch = new StopWatch();
eventCount = 0;
payloadRequestSize = 0;
payloadResponseSize = 0;
// Setup request codec
JsonOutputCodecConfig jsonOutputCodecConfig = new JsonOutputCodecConfig();
jsonOutputCodecConfig.setKeyName(batchOptionKeyName);
requestCodec = new JsonOutputCodec(jsonOutputCodecConfig);
private final ByteArrayOutputStream byteArrayOutputStream;

private final List<Record<Event>> records;
private final StopWatch bufferWatch;
private final StopWatch lambdaLatencyWatch;
private final OutputCodec requestCodec;
private int eventCount;
private long payloadRequestSize;
private long payloadResponseSize;


public InMemoryBuffer(String batchOptionKeyName) {
byteArrayOutputStream = new ByteArrayOutputStream();
records = new ArrayList<>();
bufferWatch = new StopWatch();
bufferWatch.start();
lambdaLatencyWatch = new StopWatch();
eventCount = 0;
payloadRequestSize = 0;
payloadResponseSize = 0;
// Setup request codec
JsonOutputCodecConfig jsonOutputCodecConfig = new JsonOutputCodecConfig();
jsonOutputCodecConfig.setKeyName(batchOptionKeyName);
requestCodec = new JsonOutputCodec(jsonOutputCodecConfig);
}

public void addRecord(Record<Event> record) {
records.add(record);
Event event = record.getData();
try {
if (eventCount == 0) {
requestCodec.start(this.byteArrayOutputStream, event, new OutputCodecContext());
}
requestCodec.writeEvent(event, this.byteArrayOutputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}

public void addRecord(Record<Event> record) {
records.add(record);
Event event = record.getData();
try {
if (eventCount == 0) {
requestCodec.start(this.byteArrayOutputStream, event, new OutputCodecContext());
}
requestCodec.writeEvent(event, this.byteArrayOutputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
eventCount++;
eventCount++;
}

public List<Record<Event>> getRecords() {
return records;
}

@Override
public long getSize() {
return byteArrayOutputStream.size();
}

@Override
public int getEventCount() {
return eventCount;
}

public Duration getDuration() {
return Duration.ofMillis(bufferWatch.getTime(TimeUnit.MILLISECONDS));
}

public void reset() {
byteArrayOutputStream.reset();
eventCount = 0;
bufferWatch.reset();
lambdaLatencyWatch.reset();
payloadRequestSize = 0;
payloadResponseSize = 0;
}

@Override
public InvokeRequest getRequestPayload(String functionName, String invocationType) {

if (eventCount == 0) {
//We never added any events so there is no payload
return null;
}

public List<Record<Event>> getRecords() {
return records;
try {
requestCodec.complete(this.byteArrayOutputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}

@Override
public long getSize() {
return byteArrayOutputStream.size();
}
SdkBytes payload = getPayload();
payloadRequestSize = payload.asByteArray().length;

@Override
public int getEventCount() {
return eventCount;
}

@Override
public void setEventCount(int eventCount) {
this.eventCount = eventCount;
}
// Setup an InvokeRequest.
InvokeRequest request = InvokeRequest.builder()
.functionName(functionName)
.payload(payload)
.invocationType(invocationType)
.build();

public Duration getDuration() {
return Duration.ofMillis(bufferWatch.getTime(TimeUnit.MILLISECONDS));
}

public void reset() {
byteArrayOutputStream.reset();
eventCount = 0;
bufferWatch.reset();
synchronized (this) {
if (lambdaLatencyWatch.isStarted()) {
lambdaLatencyWatch.reset();
payloadRequestSize = 0;
payloadResponseSize = 0;
}

@Override
public InvokeRequest getRequestPayload(String functionName, String invocationType) {

try {
requestCodec.complete(this.byteArrayOutputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}

SdkBytes payload = getPayload();
payloadRequestSize = payload.asByteArray().length;

// Setup an InvokeRequest.
InvokeRequest request = InvokeRequest.builder()
.functionName(functionName)
.payload(payload)
.invocationType(invocationType)
.build();

synchronized (this) {
if (lambdaLatencyWatch.isStarted()) {
lambdaLatencyWatch.reset();
}
lambdaLatencyWatch.start();
}
return request;
}

public synchronized Duration stopLatencyWatch() {
if (lambdaLatencyWatch.isStarted()) {
lambdaLatencyWatch.stop();
}
long timeInMillis = lambdaLatencyWatch.getTime();
return Duration.ofMillis(timeInMillis);
}


@Override
public OutputStream getOutputStream() {
return byteArrayOutputStream;
}

@Override
public SdkBytes getPayload() {
byte[] bytes = byteArrayOutputStream.toByteArray();
SdkBytes sdkBytes = SdkBytes.fromByteArray(bytes);
return sdkBytes;
}

public Duration getFlushLambdaLatencyMetric() {
return Duration.ofMillis(lambdaLatencyWatch.getTime(TimeUnit.MILLISECONDS));
}
lambdaLatencyWatch.start();
}
return request;
}

public Long getPayloadRequestSize() {
return payloadRequestSize;
public synchronized Duration stopLatencyWatch() {
if (lambdaLatencyWatch.isStarted()) {
lambdaLatencyWatch.stop();
}
long timeInMillis = lambdaLatencyWatch.getTime();
return Duration.ofMillis(timeInMillis);
}

@Override
public SdkBytes getPayload() {
byte[] bytes = byteArrayOutputStream.toByteArray();
return SdkBytes.fromByteArray(bytes);
}

public Duration getFlushLambdaLatencyMetric() {
return Duration.ofMillis(lambdaLatencyWatch.getTime(TimeUnit.MILLISECONDS));
}

public Long getPayloadRequestSize() {
return payloadRequestSize;
}

}

Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBuffer;
import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType;
import software.amazon.awssdk.core.SdkBytes;
Expand All @@ -42,26 +45,25 @@ class InMemoryBufferTest {
private final String batchOptionKeyName = "bathOption";
@Mock
private LambdaAsyncClient lambdaAsyncClient;
private InMemoryBuffer inMemoryBuffer;

@Test
void test_with_write_event_into_buffer() throws IOException {
inMemoryBuffer = new InMemoryBuffer(batchOptionKeyName);

@Test
void test_with_write_event_into_buffer() {
InMemoryBuffer inMemoryBuffer = new InMemoryBuffer(batchOptionKeyName);
//UUID based random event created. Each UUID string is of 36 characters long
int eachEventSize = 36;
long sizeToAssert = eachEventSize * MAX_EVENTS;
while (inMemoryBuffer.getEventCount() < MAX_EVENTS) {
OutputStream outputStream = inMemoryBuffer.getOutputStream();
outputStream.write(generateByteArray());
int eventCount = inMemoryBuffer.getEventCount() + 1;
inMemoryBuffer.setEventCount(eventCount);
inMemoryBuffer.addRecord(getSampleRecord());
}
assertThat(inMemoryBuffer.getSize(), greaterThanOrEqualTo(54110L));
assertThat(inMemoryBuffer.getSize(), greaterThanOrEqualTo(sizeToAssert));
assertThat(inMemoryBuffer.getEventCount(), equalTo(MAX_EVENTS));
assertThat(inMemoryBuffer.getDuration(), notNullValue());
assertThat(inMemoryBuffer.getDuration(), greaterThanOrEqualTo(Duration.ZERO));
}

@Test
void test_with_write_event_into_buffer_and_flush_toLambda() throws IOException {
void test_with_write_event_into_buffer_and_flush_toLambda() {

// Mock the response of the invoke method
InvokeResponse mockResponse = InvokeResponse.builder()
Expand All @@ -72,12 +74,9 @@ void test_with_write_event_into_buffer_and_flush_toLambda() throws IOException {
CompletableFuture<InvokeResponse> future = CompletableFuture.completedFuture(mockResponse);
when(lambdaAsyncClient.invoke(any(InvokeRequest.class))).thenReturn(future);

inMemoryBuffer = new InMemoryBuffer(batchOptionKeyName);
InMemoryBuffer inMemoryBuffer = new InMemoryBuffer(batchOptionKeyName);
while (inMemoryBuffer.getEventCount() < MAX_EVENTS) {
OutputStream outputStream = inMemoryBuffer.getOutputStream();
outputStream.write(generateByteArray());
int eventCount = inMemoryBuffer.getEventCount() + 1;
inMemoryBuffer.setEventCount(eventCount);
inMemoryBuffer.addRecord(getSampleRecord());
}
assertDoesNotThrow(() -> {
InvokeRequest requestPayload = inMemoryBuffer.getRequestPayload(
Expand All @@ -88,8 +87,13 @@ void test_with_write_event_into_buffer_and_flush_toLambda() throws IOException {
});
}

private Record<Event> getSampleRecord() {
Event event = JacksonEvent.fromMessage(String.valueOf(UUID.randomUUID()));
return new Record<>(event);
}

@Test
void test_uploadedToLambda_success() throws IOException {
void test_uploadedToLambda_success() {
// Mock the response of the invoke method
InvokeResponse mockResponse = InvokeResponse.builder()
.statusCode(200) // HTTP 200 for successful invocation
Expand All @@ -100,11 +104,9 @@ void test_uploadedToLambda_success() throws IOException {
CompletableFuture<InvokeResponse> future = CompletableFuture.completedFuture(mockResponse);
when(lambdaAsyncClient.invoke(any(InvokeRequest.class))).thenReturn(future);

inMemoryBuffer = new InMemoryBuffer(batchOptionKeyName);
InMemoryBuffer inMemoryBuffer = new InMemoryBuffer(batchOptionKeyName);
assertNotNull(inMemoryBuffer);
OutputStream outputStream = inMemoryBuffer.getOutputStream();
outputStream.write(generateByteArray());
inMemoryBuffer.setEventCount(1);
inMemoryBuffer.addRecord(getSampleRecord());

assertDoesNotThrow(() -> {
InvokeRequest requestPayload = inMemoryBuffer.getRequestPayload(
Expand All @@ -125,9 +127,11 @@ void test_uploadedToLambda_fails() {

when(lambdaAsyncClient.invoke(any(InvokeRequest.class))).thenReturn(future);

inMemoryBuffer = new InMemoryBuffer(batchOptionKeyName);
InMemoryBuffer inMemoryBuffer = new InMemoryBuffer(batchOptionKeyName);
assertNotNull(inMemoryBuffer);

assertNull(inMemoryBuffer.getRequestPayload(functionName, invocationType));
inMemoryBuffer.addRecord(getSampleRecord());
// Execute and assert exception
CompletionException exception = assertThrows(CompletionException.class, () -> {
InvokeRequest requestPayload = inMemoryBuffer.getRequestPayload(
Expand Down

0 comments on commit 46698fd

Please sign in to comment.