diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/README.md b/data-prepper-plugins/saas-source-plugins/jira-source/README.md index 5845729319..f2a1148a2e 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/README.md +++ b/data-prepper-plugins/saas-source-plugins/jira-source/README.md @@ -6,4 +6,3 @@ ### Timer - `requestProcessDuration`: measures latency of requests processed by the jira source plugin. - diff --git a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasPluginExecutorServiceProvider.java b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasPluginExecutorServiceProvider.java index ec6bce6c62..5e256d4033 100644 --- a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasPluginExecutorServiceProvider.java +++ b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasPluginExecutorServiceProvider.java @@ -3,8 +3,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.PreDestroy; import javax.inject.Named; +import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -13,8 +13,16 @@ public class SaasPluginExecutorServiceProvider { Logger log = LoggerFactory.getLogger(SaasPluginExecutorServiceProvider.class); public static final int DEFAULT_THREAD_COUNT = 50; + private final ExecutorService executorService; - private final ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT); + public SaasPluginExecutorServiceProvider() { + executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT); + } + + //Constructor for testing + public SaasPluginExecutorServiceProvider(ExecutorService testExecutorService) { + executorService = testExecutorService; + } public ExecutorService get() { return executorService; @@ -35,4 +43,4 @@ public void terminateExecutor() { executorService.shutdownNow(); } } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/CrawlerTest.java b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/CrawlerTest.java index 672b5fb4a3..a6ed381723 100644 --- a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/CrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/CrawlerTest.java @@ -16,7 +16,9 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; @@ -45,10 +47,8 @@ public class CrawlerTest { @Mock private ItemInfo item; - private Crawler crawler; - @BeforeEach public void setup() { crawler = new Crawler(client); @@ -101,10 +101,86 @@ void testCrawlWithMultiplePartitions() throws NoSuchFieldException, IllegalAcce } + @Test + void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException { + long lastPollTime = 0; + List itemInfoList = new ArrayList<>(); + int maxItemsPerPage = getMaxItemsPerPage(); + itemInfoList.add(null); + for (int i = 0; i < maxItemsPerPage-1; i++) { + itemInfoList.add(item); + } + when(client.listItems()).thenReturn(itemInfoList.iterator()); + crawler.crawl(lastPollTime, coordinator); + verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); + } + + @Test + void testUpdatingPollTimeNullMetaData() { + long lastPollTime = 0; + List itemInfoList = new ArrayList<>(); + ItemInfo testItem = createTestItemInfo("1",null, null); + itemInfoList.add(testItem); + when(client.listItems()).thenReturn(itemInfoList.iterator()); + long updatedPollTime = crawler.crawl(lastPollTime, coordinator); + assert(updatedPollTime != 0); + } + + @Test + void testUpdatedPollTimeNiCreatedLarger() { + long lastPollTime = 0; + List itemInfoList = new ArrayList<>(); + ItemInfo testItem = createTestItemInfo("1","10" , "5"); + itemInfoList.add(testItem); + when(client.listItems()).thenReturn(itemInfoList.iterator()); + long updatedPollTime = crawler.crawl(lastPollTime, coordinator); + assert(updatedPollTime == 11); + } + @Test + void testUpdatedPollTimeNiUpdatedLarger() { + long lastPollTime = 0; + List itemInfoList = new ArrayList<>(); + ItemInfo testItem = createTestItemInfo("1","5" , "10"); + itemInfoList.add(testItem); + when(client.listItems()).thenReturn(itemInfoList.iterator()); + long updatedPollTime = crawler.crawl(lastPollTime, coordinator); + assert(updatedPollTime == 11); + } + + private int getMaxItemsPerPage() throws NoSuchFieldException, IllegalAccessException { - Field maxItemsPerPageField = Crawler.class.getDeclaredField("maxItemsPerPage"); - maxItemsPerPageField.setAccessible(true); - return (int) maxItemsPerPageField.get(null); -} + Field maxItemsPerPageField = Crawler.class.getDeclaredField("maxItemsPerPage"); + maxItemsPerPageField.setAccessible(true); + return (int) maxItemsPerPageField.get(null); + } + + private static class TestItemInfo extends ItemInfo { + public TestItemInfo(String itemId, Map metadata, Long eventTime) { + super(itemId, metadata, eventTime); + + } + @Override + public String getPartitionKey() { + return getItemId(); + } + + @Override + public String getId() { + return getItemId(); + } + + @Override + public Map getKeyAttributes() { + return new HashMap<>(); + } + } + + private ItemInfo createTestItemInfo(String id, String created, String updated) { + Map metadata = new HashMap<>(); + if (created != null) metadata.put(Crawler.CREATED, created); + if (updated != null) metadata.put(Crawler.UPDATED, updated); + + return new TestItemInfo(id, metadata, System.currentTimeMillis()); + } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasPluginExecutorServiceProviderTest.java b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasPluginExecutorServiceProviderTest.java index 787ec8c601..c6dacd0694 100644 --- a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasPluginExecutorServiceProviderTest.java +++ b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasPluginExecutorServiceProviderTest.java @@ -4,12 +4,18 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class SaasPluginExecutorServiceProviderTest { @@ -17,6 +23,10 @@ public class SaasPluginExecutorServiceProviderTest { private SaasPluginExecutorServiceProvider provider; private ExecutorService executorService; + private SaasPluginExecutorServiceProvider provider2; + @Mock + private ExecutorService mockExecutorService; + @BeforeEach void setUp() { provider = new SaasPluginExecutorServiceProvider(); @@ -34,11 +44,26 @@ void testConstruction() { assertNotNull(provider); } - @Test void testTerminateExecutor() { provider.terminateExecutor(); assertTrue(executorService.isShutdown()); assertTrue(executorService.isTerminated()); } -} + + @Test + void terminateExecutorInterruptionTest() throws InterruptedException { + provider2 = new SaasPluginExecutorServiceProvider(mockExecutorService); + when(mockExecutorService.awaitTermination(anyLong(),any(TimeUnit.class))).thenThrow(new InterruptedException()); + AtomicBoolean wasInterrupted = new AtomicBoolean(false); + + Thread testThread = new Thread(() -> { + provider2.terminateExecutor(); + wasInterrupted.set(Thread.interrupted()); + }); + testThread.start(); + testThread.join(); + + assertTrue(wasInterrupted.get()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasSourcePluginTest.java b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasSourcePluginTest.java index 6b05df3dce..51f21e9226 100644 --- a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasSourcePluginTest.java +++ b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasSourcePluginTest.java @@ -2,6 +2,7 @@ import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -55,13 +56,25 @@ public class SaasSourcePluginTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + SourcePartitionStoreItem mockItem; + + @Mock + EnhancedSourcePartition mockPartition; + @Mock private EnhancedSourceCoordinator sourceCoordinator; - private SaasSourcePlugin saasSourcePlugin; + private testSaasSourcePlugin saasSourcePlugin; - static class TestSaasSourcePlugin extends SaasSourcePlugin { - public TestSaasSourcePlugin(PluginMetrics pluginMetrics, SaasSourceConfig sourceConfig, PluginFactory pluginFactory, AcknowledgementSetManager acknowledgementSetManager, Crawler crawler, SaasPluginExecutorServiceProvider executorServiceProvider) { + @Nested + public class testSaasSourcePlugin extends SaasSourcePlugin { + public testSaasSourcePlugin(final PluginMetrics pluginMetrics, + final SaasSourceConfig sourceConfig, + final PluginFactory pluginFactory, + final AcknowledgementSetManager acknowledgementSetManager, + final Crawler crawler, + final SaasPluginExecutorServiceProvider executorServiceProvider) { super(pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); } } @@ -69,7 +82,7 @@ public TestSaasSourcePlugin(PluginMetrics pluginMetrics, SaasSourceConfig source @BeforeEach void setUp() { when(executorServiceProvider.get()).thenReturn(executorService); - saasSourcePlugin = new TestSaasSourcePlugin(pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); + saasSourcePlugin = new testSaasSourcePlugin(pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); } @Test @@ -84,6 +97,16 @@ void testSetEnhancedSourceCoordinator() { verify(sourceCoordinator).initialize(); } + @Test + void areAcknowledgementsEnabledTest() { + assertFalse(saasSourcePlugin.areAcknowledgementsEnabled()); + } + + @Test + void saasSourceConfigGetterTest() { + assertNotNull(saasSourcePlugin.getSourceConfig()); + } + @Test void startTest() { saasSourcePlugin.setEnhancedSourceCoordinator(sourceCoordinator); @@ -93,7 +116,7 @@ void startTest() { } @Test - void testExecutorServiceSchedulersSubmitted(){ + void testExecutorServiceSchedulersSubmitted() { saasSourcePlugin.setEnhancedSourceCoordinator(sourceCoordinator); saasSourcePlugin.start(buffer); verify(executorService, times(1)).submit(any(LeaderScheduler.class)); @@ -121,4 +144,5 @@ void testGetDecoder() { } -} + +} \ No newline at end of file