diff --git a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas_crawler/coordination/scheduler/LeaderSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas_crawler/coordination/scheduler/LeaderSchedulerTest.java index 533aa7e9c6..1fffa78ed4 100644 --- a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas_crawler/coordination/scheduler/LeaderSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas_crawler/coordination/scheduler/LeaderSchedulerTest.java @@ -19,6 +19,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -55,6 +56,7 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte leaderPartition.getProgressState().get().setInitialized(initializationState); leaderPartition.getProgressState().get().setLastPollTime(0L); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); + doThrow(RuntimeException.class).when(coordinator).saveProgressStateForPartition(any(LeaderPartition.class), any(Duration.class)); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(leaderScheduler); diff --git a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas_crawler/coordination/scheduler/WorkerSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas_crawler/coordination/scheduler/WorkerSchedulerTest.java index b272f22a48..8d7d31f3bb 100644 --- a/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas_crawler/coordination/scheduler/WorkerSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/saas-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/saas_crawler/coordination/scheduler/WorkerSchedulerTest.java @@ -80,7 +80,28 @@ void testLeaderPartitionsCreation() throws InterruptedException { SaasWorkerProgressState stateObj = (SaasWorkerProgressState)sourcePartition.getProgressState().get(); verify(crawler, atLeast(1)).executePartition(stateObj, buffer, sourceConfig); verify(coordinator, atLeast(1)).completePartition(eq(sourcePartition)); + } + + @Test + void testEmptyProgressState() throws InterruptedException { + WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler); + + String sourceId = UUID.randomUUID() + "|" + SaasSourcePartition.PARTITION_TYPE; + when(sourcePartitionStoreItem.getPartitionProgressState()).thenReturn(null); + when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn(sourceId); + PartitionFactory factory = new PartitionFactory(); + EnhancedSourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); + given(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)).willReturn(Optional.of(sourcePartition)); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(workerScheduler); + + Thread.sleep(50); + executorService.shutdownNow(); + + // Check if crawler was invoked and updated leader lease renewal time + verifyNoInteractions(crawler); + verify(coordinator, atLeast(1)).completePartition(eq(sourcePartition)); } @Test