From ced7a2d57143c7340e9fdf41e484a7f38a52d802 Mon Sep 17 00:00:00 2001 From: Srikanth Govindarajan Date: Fri, 24 Jan 2025 17:29:29 -0800 Subject: [PATCH] Address concurrency/synchronization comment Signed-off-by: Srikanth Govindarajan --- .../lambda/common/LambdaCommonHandler.java | 5 + .../InMemoryBufferSynchronized.java | 156 ++++++++++++++++++ .../plugins/lambda/sink/LambdaSink.java | 19 +-- .../InMemoryBufferSynchronizedTest.java | 142 ++++++++++++++++ .../plugins/lambda/sink/LambdaSinkTest.java | 59 +++---- 5 files changed, 343 insertions(+), 38 deletions(-) create mode 100644 data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferSynchronized.java create mode 100644 data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferSynchronizedTest.java diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java index 3518508b64..1c31b2174c 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java @@ -84,6 +84,11 @@ public static Map> sendRecords( List batchedBuffers = createBufferBatches(records, config.getBatchOptions(), outputCodecContext); + Map> bufferToFutureMap = invokeLambdaAndGetFutureMap(config, lambdaAsyncClient, batchedBuffers); + return bufferToFutureMap; + } + + public static Map> invokeLambdaAndGetFutureMap(LambdaCommonConfig config, LambdaAsyncClient lambdaAsyncClient, List batchedBuffers) { Map> bufferToFutureMap = new HashMap<>(); LOG.debug("Batch Chunks created after threshold check: {}", batchedBuffers.size()); for (Buffer buffer : batchedBuffers) { diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferSynchronized.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferSynchronized.java new file mode 100644 index 0000000000..a63568a33b --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferSynchronized.java @@ -0,0 +1,156 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.lambda.common.accumlator; + +import org.apache.commons.lang3.time.StopWatch; +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; +import org.opensearch.dataprepper.plugins.codec.json.JsonOutputCodec; +import org.opensearch.dataprepper.plugins.codec.json.JsonOutputCodecConfig; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.lambda.model.InvokeRequest; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * A buffer can hold in memory data and flushing it. + */ +public class InMemoryBufferSynchronized implements Buffer { + + private final ByteArrayOutputStream byteArrayOutputStream; + + private final List> records; + private final StopWatch bufferWatch; + private final StopWatch lambdaLatencyWatch; + private final OutputCodec requestCodec; + private final OutputCodecContext outputCodecContext; + private final long payloadResponseSize; + private int eventCount; + private long payloadRequestSize; + + + public InMemoryBufferSynchronized(String batchOptionKeyName) { + this(batchOptionKeyName, new OutputCodecContext()); + } + + public InMemoryBufferSynchronized(String batchOptionKeyName, OutputCodecContext outputCodecContext) { + byteArrayOutputStream = new ByteArrayOutputStream(); + records = Collections.synchronizedList(new ArrayList<>()); + bufferWatch = new StopWatch(); + bufferWatch.start(); + lambdaLatencyWatch = new StopWatch(); + eventCount = 0; + payloadRequestSize = 0; + payloadResponseSize = 0; + // Setup request codec + JsonOutputCodecConfig jsonOutputCodecConfig = new JsonOutputCodecConfig(); + jsonOutputCodecConfig.setKeyName(batchOptionKeyName); + requestCodec = new JsonOutputCodec(jsonOutputCodecConfig); + this.outputCodecContext = outputCodecContext; + } + + /* + * Note: JsonCodec is NOT thread safe, so we need to synchronize this method + */ + @Override + public synchronized void addRecord(Record record) { + records.add(record); + Event event = record.getData(); + try { + if (eventCount == 0) { + requestCodec.start(this.byteArrayOutputStream, event, this.outputCodecContext); + } + requestCodec.writeEvent(event, this.byteArrayOutputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + eventCount++; + } + + @Override + public List> getRecords() { + return records; + } + + @Override + public long getSize() { + return byteArrayOutputStream.size(); + } + + @Override + public int getEventCount() { + return eventCount; + } + + public Duration getDuration() { + return Duration.ofMillis(bufferWatch.getTime(TimeUnit.MILLISECONDS)); + } + + @Override + public InvokeRequest getRequestPayload(String functionName, String invocationType) { + + if (eventCount == 0) { + //We never added any events so there is no payload + return null; + } + + try { + requestCodec.complete(this.byteArrayOutputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + + SdkBytes payload = getPayload(); + payloadRequestSize = payload.asByteArray().length; + + // Setup an InvokeRequest. + InvokeRequest request = InvokeRequest.builder() + .functionName(functionName) + .payload(payload) + .invocationType(invocationType) + .build(); + + synchronized (this) { + if (lambdaLatencyWatch.isStarted()) { + lambdaLatencyWatch.reset(); + } + lambdaLatencyWatch.start(); + } + return request; + } + + public synchronized Duration stopLatencyWatch() { + if (lambdaLatencyWatch.isStarted()) { + lambdaLatencyWatch.stop(); + } + long timeInMillis = lambdaLatencyWatch.getTime(); + return Duration.ofMillis(timeInMillis); + } + + @Override + public SdkBytes getPayload() { + byte[] bytes = byteArrayOutputStream.toByteArray(); + return SdkBytes.fromByteArray(bytes); + } + + public Duration getFlushLambdaLatencyMetric() { + return Duration.ofMillis(lambdaLatencyWatch.getTime(TimeUnit.MILLISECONDS)); + } + + public Long getPayloadRequestSize() { + return payloadRequestSize; + } + +} + diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java index 6483861377..c9bd87a4da 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java @@ -25,7 +25,7 @@ import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.lambda.common.LambdaCommonHandler; import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; -import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBuffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferSynchronized; import org.opensearch.dataprepper.plugins.lambda.common.client.LambdaClientFactory; import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions; import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck; @@ -152,7 +152,7 @@ public void doInitialize() { private void doInitializeInternal() { // Initialize the partial buffer - statefulBuffer = new InMemoryBuffer( + statefulBuffer = new InMemoryBufferSynchronized( lambdaSinkConfig.getBatchOptions().getKeyName(), outputCodecContext ); @@ -164,7 +164,7 @@ private void doInitializeInternal() { * do a time-based flush. */ @Override - public synchronized void shutdown() { + public void shutdown() { // Flush the partial buffer if any leftover if (statefulBuffer.getEventCount() > 0) { flushBuffers(Collections.singletonList(statefulBuffer)); @@ -172,7 +172,7 @@ public synchronized void shutdown() { } @Override - public synchronized void doOutput(final Collection> records) { + public void doOutput(final Collection> records) { if (!sinkInitialized) { LOG.warn("LambdaSink doOutput called before initialization"); return; @@ -193,7 +193,7 @@ public synchronized void doOutput(final Collection> records) { // This buffer is full fullBuffers.add(statefulBuffer); // Create new partial buffer - statefulBuffer = new InMemoryBuffer( + statefulBuffer = new InMemoryBufferSynchronized( lambdaSinkConfig.getBatchOptions().getKeyName(), outputCodecContext ); @@ -225,7 +225,7 @@ private DlqObject createDlqObjectFromEvent(final Event event, .build(); } - synchronized void handleFailure(Collection> failedRecords, Throwable throwable, int statusCode) { + void handleFailure(Collection> failedRecords, Throwable throwable, int statusCode) { if (failedRecords.isEmpty()) { return; } @@ -265,7 +265,7 @@ private void releaseEventHandles(Collection> records, boolean succ } } - private synchronized void flushBuffers(final List buffersToFlush) { + private void flushBuffers(final List buffersToFlush) { // Combine all their records for a single call to sendRecords List> combinedRecords = new ArrayList<>(); for (Buffer buf : buffersToFlush) { @@ -274,11 +274,10 @@ private synchronized void flushBuffers(final List buffersToFlush) { Map> bufferToFutureMap; try { - bufferToFutureMap = LambdaCommonHandler.sendRecords( - combinedRecords, + bufferToFutureMap = LambdaCommonHandler.invokeLambdaAndGetFutureMap( lambdaSinkConfig, lambdaAsyncClient, - outputCodecContext + buffersToFlush ); } catch (Exception e) { LOG.error(NOISY, "Error sending buffers to Lambda", e); diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferSynchronizedTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferSynchronizedTest.java new file mode 100644 index 0000000000..60ae2de0f3 --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferSynchronizedTest.java @@ -0,0 +1,142 @@ +package org.opensearch.dataprepper.plugins.lambda.common.accumulator; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockitoAnnotations; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferSynchronized; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.lambda.model.InvokeRequest; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +class InMemoryBufferSynchronizedTest { + private AutoCloseable mocks; + + @BeforeEach + void setUp() { + mocks = MockitoAnnotations.openMocks(this); + } + + @AfterEach + void tearDown() throws Exception { + mocks.close(); + } + + @Test + void testAddRecordAndGetRecords() { + InMemoryBufferSynchronized buffer = new InMemoryBufferSynchronized("testKey"); + + // Initially empty + assertEquals(0, buffer.getEventCount()); + assertTrue(buffer.getRecords().isEmpty()); + assertEquals(0, buffer.getSize()); + + // Add a record + Event event = createSimpleEvent("hello", 123); + buffer.addRecord(new Record<>(event)); + + assertEquals(1, buffer.getEventCount()); + assertEquals(1, buffer.getRecords().size()); + assertTrue(buffer.getSize() > 0, "ByteArrayOutputStream should have some bytes after writing an event"); + } + + @Test + void testGetRequestPayloadWhenEmptyReturnsNull() { + InMemoryBufferSynchronized buffer = new InMemoryBufferSynchronized("testKey"); + // No records added => eventCount=0 + InvokeRequest request = buffer.getRequestPayload("someFunction", "RequestResponse"); + assertNull(request, "Expected null request if no events are in the buffer"); + } + + @Test + void testGetRequestPayloadNonEmpty() { + InMemoryBufferSynchronized buffer = new InMemoryBufferSynchronized("testKey"); + buffer.addRecord(new Record<>(createSimpleEvent("k1", 111))); + buffer.addRecord(new Record<>(createSimpleEvent("k2", 222))); + + // Now we should have 2 events + assertEquals(2, buffer.getEventCount()); + + // getRequestPayload => closes JSON, returns an InvokeRequest + InvokeRequest request = buffer.getRequestPayload("testFunction", "RequestResponse"); + assertNotNull(request); + // Should not be null after we finalize + SdkBytes payload = request.payload(); + assertNotNull(payload); + // The payload should contain some JSON array with 2 items + String payloadString = payload.asUtf8String(); + assertTrue(payloadString.contains("\"k1\":\"111\""), "Expected 'k1' field in JSON"); + assertTrue(payloadString.contains("\"k2\":\"222\""), "Expected 'k2' field in JSON"); + + // Also, verify the payloadRequestSize is set + Long requestSize = buffer.getPayloadRequestSize(); + assertNotNull(requestSize); + assertTrue(requestSize > 0, "Expected a non-zero payload request size"); + } + + @Test + void testConcurrentAddRecords() throws InterruptedException { + InMemoryBufferSynchronized buffer = new InMemoryBufferSynchronized("testKey"); + + int numThreads = 5; + int recordsPerThread = 10; + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + + // Each thread adds 10 records => total 50 + for (int t = 0; t < numThreads; t++) { + pool.submit(() -> { + for (int i = 0; i < recordsPerThread; i++) { + buffer.addRecord(new Record<>(createSimpleEvent("thread", i))); + } + }); + } + pool.shutdown(); + assertTrue(pool.awaitTermination(5, TimeUnit.SECONDS), + "Threads did not finish in time"); + + // Should now have 50 records + assertEquals(numThreads * recordsPerThread, buffer.getEventCount()); + assertEquals(numThreads * recordsPerThread, buffer.getRecords().size()); + + // ensure we get a JSON array with 50 items + InvokeRequest request = buffer.getRequestPayload("threadFunction", "RequestResponse"); + String payloadStr = request.payload().asUtf8String(); + // Just check if it has multiple items + long countOfThread = countOccurrences(payloadStr, "\"thread\":\""); + assertTrue(countOfThread >= numThreads, + "Expected multiple 'thread' fields in the JSON payload, found " + countOfThread); + } + + // Utility to create a simple test event + private Event createSimpleEvent(String key, int value) { + // This is just one possible way to create a test Event + return JacksonEvent.builder() + .withData(Collections.singletonMap(key, String.valueOf(value))) + .withEventType("TEST") + .build(); + } + + // Utility to count occurrences of a substring + private static long countOccurrences(String haystack, String needle) { + long count = 0; + int idx = 0; + while ((idx = haystack.indexOf(needle, idx)) != -1) { + count++; + idx += needle.length(); + } + return count; + } +} diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java index d98ad0c7b3..fc4fc29a7c 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java @@ -44,7 +44,6 @@ import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; @@ -178,13 +177,14 @@ void testNoFlushIfThresholdNotReached() { new Record<>(mock(Event.class)) ); - // We mock the static method, expecting zero calls + // We expect no call to invokeLambdaAndGetFutureMap(...) since threshold not hit try (MockedStatic mockedHandler = mockStatic(LambdaCommonHandler.class)) { + lambdaSink.doOutput(records); - // Because threshold=2 and we only provided 1 event, no flush => 0 calls to sendRecords + // Because threshold=2 and we only provided 1 event, no flush => 0 calls mockedHandler.verify( - () -> LambdaCommonHandler.sendRecords(anyCollection(), any(), any(), any()), + () -> LambdaCommonHandler.invokeLambdaAndGetFutureMap(any(), any(), anyList()), never() ); } @@ -204,44 +204,46 @@ void testFlushWhenThresholdReached() { new Record<>(mock(Event.class)) ); - // Mock the static call to 'sendRecords(...)' to return a completed future + // Mock static calls try (MockedStatic mockedHandler = mockStatic(LambdaCommonHandler.class)) { - // For any invocation of isSuccess(int), call the real method: + // We'll let isSuccess(...) call real method so it checks statusCode mockedHandler.when(() -> LambdaCommonHandler.isSuccess(any())) .thenCallRealMethod(); + final InvokeResponse mockResponse = mock(InvokeResponse.class); when(mockResponse.statusCode()).thenReturn(200); // success when(mockResponse.payload()).thenReturn(SdkBytes.fromUtf8String("{\"msg\":\"OK\"}")); + // Future that returns mockResponse CompletableFuture completedFuture = mock(CompletableFuture.class); when(completedFuture.join()).thenReturn(mockResponse); - // We can return a single mock Buffer -> future + // One buffer => future mapping final Buffer mockBuffer = mock(Buffer.class); when(mockBuffer.getRecords()).thenReturn(records); when(mockBuffer.getEventCount()).thenReturn(2); - Map> resultMap = + final Map> resultMap = Map.of(mockBuffer, completedFuture); + // Now, because flushBuffers(...) calls invokeLambdaAndGetFutureMap(...), + // we mock that: mockedHandler.when(() -> - LambdaCommonHandler.sendRecords(anyCollection(), any(), any(), any()) + LambdaCommonHandler.invokeLambdaAndGetFutureMap(any(), any(), anyList()) ).thenReturn(resultMap); // ACT lambdaSink.doOutput(records); - // VERIFY - // Because threshold=2 => flush => we should see exactly 1 call to sendRecords + // Since threshold=2 => flush => exactly 1 call to invokeLambdaAndGetFutureMap mockedHandler.verify(() -> - LambdaCommonHandler.sendRecords(anyCollection(), any(), any(), any()), + LambdaCommonHandler.invokeLambdaAndGetFutureMap(any(), any(), anyList()), times(1) ); - // The code should treat it as success => increment success counters + // We expect success verify(numberOfRecordsSuccessCounter).increment(2.0); // 2 events verify(numberOfRequestsSuccessCounter).increment(); - // No failures verify(numberOfRecordsFailedCounter, never()).increment(anyDouble()); verify(numberOfRequestsFailedCounter, never()).increment(); } @@ -257,19 +259,20 @@ void testShutdownFlushesPartialIfAny() { try (MockedStatic mockedHandler = mockStatic(LambdaCommonHandler.class)) { mockedHandler.when(() -> LambdaCommonHandler.isSuccess(any())) .thenCallRealMethod(); - // 1) doOutput => partial => no flush + // partial => no flush lambdaSink.doOutput(records); - mockedHandler.verify( - () -> LambdaCommonHandler.sendRecords(anyCollection(), any(), any(), any()), + + mockedHandler.verify(() -> + LambdaCommonHandler.invokeLambdaAndGetFutureMap(any(), any(), anyList()), never() ); - // 2) Now shutdown => flush leftover + // Now shutdown => leftover partial => flush once final InvokeResponse mockResponse = mock(InvokeResponse.class); when(mockResponse.statusCode()).thenReturn(200); when(mockResponse.payload()).thenReturn(SdkBytes.fromUtf8String("{\"msg\":\"OK\"}")); - final CompletableFuture completedFuture = + CompletableFuture completedFuture = CompletableFuture.completedFuture(mockResponse); final Buffer mockBuffer = mock(Buffer.class); @@ -277,24 +280,26 @@ void testShutdownFlushesPartialIfAny() { when(mockBuffer.getEventCount()).thenReturn(1); mockedHandler.when(() -> - LambdaCommonHandler.sendRecords(anyCollection(), any(), any(), any()) + LambdaCommonHandler.invokeLambdaAndGetFutureMap(any(), any(), anyList()) ).thenReturn(Map.of(mockBuffer, completedFuture)); + // Trigger shutdown lambdaSink.shutdown(); - // Now we expect 1 call on shutdown + // We now expect exactly 1 call mockedHandler.verify(() -> - LambdaCommonHandler.sendRecords(anyCollection(), any(), any(), any()), + LambdaCommonHandler.invokeLambdaAndGetFutureMap(any(), any(), anyList()), times(1) ); + // success counters verify(numberOfRecordsSuccessCounter).increment(1.0); verify(numberOfRequestsSuccessCounter).increment(); } } @Test - void testFailureDuringSendRecords() { + void testFailureDuringInvokeLambdaAndGetFutureMap() { // pass 2 => threshold => flush => but an exception is thrown final List> records = List.of( new Record<>(mock(Event.class)), @@ -304,15 +309,14 @@ void testFailureDuringSendRecords() { try (MockedStatic mockedHandler = mockStatic(LambdaCommonHandler.class)) { // cause the method to throw an exception mockedHandler.when(() -> - LambdaCommonHandler.sendRecords(anyCollection(), any(), any(), any()) + LambdaCommonHandler.invokeLambdaAndGetFutureMap(any(), any(), anyList()) ).thenThrow(new RuntimeException("Test flush error")); lambdaSink.doOutput(records); - // We expect the sink to handle that failure => increment fail counters + // We expect fail counters verify(numberOfRecordsFailedCounter).increment(2.0); verify(numberOfRequestsFailedCounter).increment(); - // No success increments verify(numberOfRecordsSuccessCounter, never()).increment(anyDouble()); } } @@ -337,7 +341,7 @@ void testFailureInFutureJoin() { Map.of(bufferMock, failingFuture); mockedHandler.when(() -> - LambdaCommonHandler.sendRecords(anyCollection(), any(), any(), any()) + LambdaCommonHandler.invokeLambdaAndGetFutureMap(any(), any(), anyList()) ).thenReturn(mapResult); lambdaSink.doOutput(records); @@ -349,7 +353,6 @@ void testFailureInFutureJoin() { } } - @Test public void testHandleFailure_WithDlq() throws Exception { Throwable throwable = new RuntimeException("Test Exception");