Skip to content

Commit

Permalink
Cancel the existing grok task when a timeout occurs. Resolves #4026 (#…
Browse files Browse the repository at this point in the history
…4027) (#4028)

Signed-off-by: David Venable <[email protected]>
(cherry picked from commit 7616396)

Co-authored-by: David Venable <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and dlvenable authored Jan 30, 2024
1 parent f7b9b4d commit 5d25021
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,12 @@ private boolean shouldBreakOnMatch(final Map<String, Object> captures) {
}

private void runWithTimeout(final Runnable runnable) throws TimeoutException, ExecutionException, InterruptedException {
Future<?> task = executorService.submit(runnable);
task.get(grokProcessorConfig.getTimeoutMillis(), TimeUnit.MILLISECONDS);
final Future<?> task = executorService.submit(runnable);
try {
task.get(grokProcessorConfig.getTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (final TimeoutException exception) {
task.cancel(true);
throw exception;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessor.EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT;
import static org.opensearch.dataprepper.test.matcher.MapEquals.isEqualWithoutTimestamp;
Expand Down Expand Up @@ -155,7 +156,7 @@ private GrokProcessor createObjectUnderTest() {
}

@Test
public void testMatchMerge() throws JsonProcessingException {
public void testMatchMerge() throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException {
grokProcessor = createObjectUnderTest();

capture.put("key_capture_1", "value_capture_1");
Expand All @@ -182,10 +183,12 @@ public void testMatchMerge() throws JsonProcessingException {
verify(grokProcessingMatchCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMismatchCounter, grokProcessingTimeoutsCounter);
verify(task).get(GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
verifyNoMoreInteractions(task);
}

@Test
public void testTarget() throws JsonProcessingException {
public void testTarget() throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException {
pluginSetting.getSettings().put(GrokProcessorConfig.TARGET_KEY, "test_target");
grokProcessor = createObjectUnderTest();

Expand Down Expand Up @@ -216,6 +219,8 @@ public void testTarget() throws JsonProcessingException {
verify(grokProcessingMatchCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMismatchCounter, grokProcessingTimeoutsCounter);
verify(task).get(GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
verifyNoMoreInteractions(task);
}

@Test
Expand Down Expand Up @@ -251,7 +256,7 @@ public void testOverwrite() throws JsonProcessingException {
}

@Test
public void testMatchMergeCollisionStrings() throws JsonProcessingException {
public void testMatchMergeCollisionStrings() throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException {
grokProcessor = createObjectUnderTest();

capture.put("key_capture_1", "value_capture_1");
Expand Down Expand Up @@ -398,6 +403,7 @@ public void testThatTimeoutExceptionIsCaughtAndProcessingContinues() throws Json
assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertRecordsAreEqual(grokkedRecords.get(0), record);
verify(task).cancel(true);
verify(grokProcessingTimeoutsCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
}
Expand Down

0 comments on commit 5d25021

Please sign in to comment.