diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfo.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfo.java index 25bc042b8c..0952f20a2a 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfo.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfo.java @@ -112,11 +112,6 @@ public JiraItemInfoBuilder withProject(String project) { return this; } - public JiraItemInfoBuilder withIssueType(String issueType) { - this.issueType = issueType; - return this; - } - public JiraItemInfoBuilder withIssueBean(IssueBean issue) { Map issueMetadata = new HashMap<>(); issueMetadata.put(PROJECT_KEY, issue.getProject()); diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraIterator.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraIterator.java index cd5d422047..3027b9ec99 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraIterator.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraIterator.java @@ -1,6 +1,7 @@ package org.opensearch.dataprepper.plugins.source.jira; +import com.google.common.annotations.VisibleForTesting; import lombok.Setter; import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider; import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; @@ -9,15 +10,19 @@ import javax.inject.Named; import java.time.Instant; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; @Named public class JiraIterator implements Iterator { + private static final int HAS_NEXT_TIMEOUT = 60; private static final Logger log = LoggerFactory.getLogger(JiraIterator.class); private final JiraSourceConfig sourceConfig; private final JiraService service; @@ -27,6 +32,7 @@ public class JiraIterator implements Iterator { private Queue itemInfoQueue; private Instant lastPollTime; private boolean firstTime = true; + private List> futureList = new ArrayList<>(); public JiraIterator(final JiraService service, PluginExecutorServiceProvider executorServiceProvider, @@ -40,12 +46,41 @@ public JiraIterator(final JiraService service, public boolean hasNext() { if (firstTime) { log.trace("Crawling has been started"); - itemInfoQueue = service.getJiraEntities(sourceConfig, lastPollTime); + startCrawlerThreads(); firstTime = false; } + int timeout = HAS_NEXT_TIMEOUT; + while (isCrawlerRunning() && itemInfoQueue.isEmpty() && timeout > 0) { + try { + log.trace("Waiting for crawler queue to be filled for next {} seconds", timeout); + Thread.sleep(crawlerQWaitTimeMillis); + timeout--; + } catch (InterruptedException e) { + log.error("An exception has occurred while checking for the next document in crawling queue"); + Thread.currentThread().interrupt(); + } + } return !this.itemInfoQueue.isEmpty(); } + private boolean isCrawlerRunning() { + boolean isRunning = false; + if (!futureList.isEmpty()) { + for (Future future : futureList) { + if (!future.isDone()) { + isRunning = true; + break; + } + } + } + return isRunning; + } + + private void startCrawlerThreads() { + futureList.add(crawlerTaskExecutor.submit(() -> + service.getJiraEntities(sourceConfig, lastPollTime, itemInfoQueue), false)); + } + @Override public ItemInfo next() { if (hasNext()) { @@ -66,4 +101,14 @@ public void initialize(Instant jiraChangeLogToken) { this.firstTime = true; } + @VisibleForTesting + public List> showFutureList() { + return futureList; + } + + @VisibleForTesting + public Queue showItemInfoQueue() { + return itemInfoQueue; + } + } diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraService.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraService.java index f2fead1b54..38eb9eed40 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraService.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraService.java @@ -15,15 +15,11 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.ISSUE_KEY; -import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.PROJECT_KEY; import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.UPDATED; import static org.opensearch.dataprepper.plugins.source.jira.utils.JqlConstants.CLOSING_ROUND_BRACKET; import static org.opensearch.dataprepper.plugins.source.jira.utils.JqlConstants.DELIMITER; @@ -65,12 +61,10 @@ public JiraService(JiraSourceConfig jiraSourceConfig, JiraRestClient jiraRestCli * @param configuration the configuration. * @param timestamp timestamp. */ - public Queue getJiraEntities(JiraSourceConfig configuration, Instant timestamp) { + public void getJiraEntities(JiraSourceConfig configuration, Instant timestamp, Queue itemInfoQueue) { log.trace("Started to fetch entities"); - Queue itemInfoQueue = new ConcurrentLinkedQueue<>(); searchForNewTicketsAndAddToQueue(configuration, timestamp, itemInfoQueue); log.trace("Creating item information and adding in queue"); - return itemInfoQueue; } public String getIssue(String issueKey) { @@ -170,21 +164,4 @@ private void validateProjectFilters(JiraSourceConfig configuration) { } } - /** - * Method for creating Item Info. - * - * @param key Input Parameter - * @param metadata Input Parameter - * @return Item Info - */ - private ItemInfo createItemInfo(String key, Map metadata) { - return JiraItemInfo.builder().withEventTime(Instant.now()) - .withId((String) metadata.get(ISSUE_KEY)) - .withItemId(key) - .withMetadata(metadata) - .withProject((String) metadata.get(PROJECT_KEY)) - .withIssueType((String) metadata.get(CONTENT_TYPE)) - .build(); - } - } \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClient.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClient.java index c87c7019e6..6b71a032b8 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClient.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClient.java @@ -7,9 +7,11 @@ import lombok.extern.slf4j.Slf4j; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.jira.JiraSourceConfig; +import org.opensearch.dataprepper.plugins.source.jira.exception.BadRequestException; import org.opensearch.dataprepper.plugins.source.jira.exception.UnAuthorizedException; import org.opensearch.dataprepper.plugins.source.jira.models.SearchResults; import org.opensearch.dataprepper.plugins.source.jira.rest.auth.JiraAuthConfig; +import org.opensearch.dataprepper.plugins.source.jira.utils.AddressValidation; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.client.HttpClientErrorException; @@ -93,8 +95,8 @@ public String getIssue(String issueKey) { return invokeRestApi(uri, String.class).getBody(); } - private ResponseEntity invokeRestApi(URI uri, Class responseType) { - + private ResponseEntity invokeRestApi(URI uri, Class responseType) throws BadRequestException{ + AddressValidation.validateInetAddress(AddressValidation.getInetAddress(uri.toString())); int retryCount = 0; while (retryCount < RETRY_ATTEMPT) { try { diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraIteratorTest.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraIteratorTest.java index 9c47bf8e51..8d9dc85869 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraIteratorTest.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraIteratorTest.java @@ -18,6 +18,7 @@ import java.util.NoSuchElementException; import java.util.UUID; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -101,6 +102,38 @@ void testItemInfoQueueNotEmpty() { assertNotNull(jiraIterator.next()); } + @Test + void testStartCrawlerThreads() { + jiraIterator = createObjectUnderTest(); + jiraIterator.initialize(Instant.ofEpochSecond(0)); + jiraIterator.hasNext(); + jiraIterator.hasNext(); + assertTrue(jiraIterator.showFutureList().size() == 1); + } + + @Test + void testFuturesCompleted() throws InterruptedException { + jiraIterator = createObjectUnderTest(); + List mockIssues = new ArrayList<>(); + IssueBean issue1 = createIssueBean(false); + mockIssues.add(issue1); + IssueBean issue2 = createIssueBean(false); + mockIssues.add(issue2); + IssueBean issue3 = createIssueBean(false); + mockIssues.add(issue3); + when(mockSearchResults.getIssues()).thenReturn(mockIssues); + when(mockSearchResults.getTotal()).thenReturn(0); + doReturn(mockSearchResults).when(jiraRestClient).getAllIssues(any(StringBuilder.class), anyInt(), any(JiraSourceConfig.class)); + + jiraIterator.initialize(Instant.ofEpochSecond(0)); + jiraIterator.setCrawlerQWaitTimeMillis(1); + jiraIterator.hasNext(); + + Thread.sleep(1); + jiraIterator.showFutureList().forEach(future -> assertTrue(future.isDone())); + assertEquals(jiraIterator.showItemInfoQueue().size(), mockIssues.size()); + } + @Test void testItemInfoQueueEmpty(){ jiraIterator = createObjectUnderTest(); diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraServiceTest.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraServiceTest.java index 8900936400..af07ab6f0d 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraServiceTest.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraServiceTest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -141,7 +142,8 @@ public void testGetJiraEntities() throws JsonProcessingException { doReturn(mockSearchResults).when(jiraRestClient).getAllIssues(any(StringBuilder.class), anyInt(), any(JiraSourceConfig.class)); Instant timestamp = Instant.ofEpochSecond(0); - Queue itemInfoQueue = jiraService.getJiraEntities(jiraSourceConfig, timestamp); + Queue itemInfoQueue = new ConcurrentLinkedQueue<>(); + jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue); assertEquals(mockIssues.size(), itemInfoQueue.size()); } @@ -166,7 +168,8 @@ public void buildIssueItemInfoMultipleFutureThreads() throws JsonProcessingExcep doReturn(mockSearchResults).when(jiraRestClient).getAllIssues(any(StringBuilder.class), anyInt(), any(JiraSourceConfig.class)); Instant timestamp = Instant.ofEpochSecond(0); - Queue itemInfoQueue = jiraService.getJiraEntities(jiraSourceConfig, timestamp); + Queue itemInfoQueue = new ConcurrentLinkedQueue<>(); + jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue); assertTrue(itemInfoQueue.size() >= 100); } @@ -186,7 +189,9 @@ public void testBadProjectKeys() throws JsonProcessingException { JiraService jiraService = new JiraService(jiraSourceConfig, jiraRestClient); Instant timestamp = Instant.ofEpochSecond(0); - assertThrows(BadRequestException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp)); + Queue itemInfoQueue = new ConcurrentLinkedQueue<>(); + + assertThrows(BadRequestException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue)); } @Test @@ -201,7 +206,9 @@ public void testGetJiraEntitiesException() throws JsonProcessingException { doThrow(RuntimeException.class).when(jiraRestClient).getAllIssues(any(StringBuilder.class), anyInt(), any(JiraSourceConfig.class)); Instant timestamp = Instant.ofEpochSecond(0); - assertThrows(RuntimeException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp)); + Queue itemInfoQueue = new ConcurrentLinkedQueue<>(); + + assertThrows(RuntimeException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue)); } diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClientTest.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClientTest.java index d79372c65b..d294b4f599 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClientTest.java @@ -11,6 +11,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.plugins.source.jira.JiraServiceTest; import org.opensearch.dataprepper.plugins.source.jira.JiraSourceConfig; +import org.opensearch.dataprepper.plugins.source.jira.exception.BadRequestException; import org.opensearch.dataprepper.plugins.source.jira.exception.UnAuthorizedException; import org.opensearch.dataprepper.plugins.source.jira.models.SearchResults; import org.opensearch.dataprepper.plugins.source.jira.rest.auth.JiraAuthConfig; @@ -108,7 +109,7 @@ public void testGetAllIssuesOauth2() throws JsonProcessingException { JiraSourceConfig jiraSourceConfig = JiraServiceTest.createJiraConfiguration(OAUTH2, issueType, issueStatus, projectKey); JiraRestClient jiraRestClient = new JiraRestClient(restTemplate, authConfig); SearchResults mockSearchResults = mock(SearchResults.class); - doReturn("http://mock-service.jira.com").when(authConfig).getUrl(); + doReturn("http://mock-service.jira.com/").when(authConfig).getUrl(); doReturn(new ResponseEntity<>(mockSearchResults, HttpStatus.OK)).when(restTemplate).getForEntity(any(URI.class), any(Class.class)); SearchResults results = jiraRestClient.getAllIssues(jql, 0, jiraSourceConfig); assertNotNull(results); @@ -123,10 +124,17 @@ public void testGetAllIssuesBasic() throws JsonProcessingException { JiraSourceConfig jiraSourceConfig = JiraServiceTest.createJiraConfiguration(BASIC, issueType, issueStatus, projectKey); JiraRestClient jiraRestClient = new JiraRestClient(restTemplate, authConfig); SearchResults mockSearchResults = mock(SearchResults.class); - when(authConfig.getUrl()).thenReturn("https://example.com"); + when(authConfig.getUrl()).thenReturn("https://example.com/"); doReturn(new ResponseEntity<>(mockSearchResults, HttpStatus.OK)).when(restTemplate).getForEntity(any(URI.class), any(Class.class)); SearchResults results = jiraRestClient.getAllIssues(jql, 0, jiraSourceConfig); assertNotNull(results); } + @Test + public void testRestApiAddressValidation() throws JsonProcessingException { + when(authConfig.getUrl()).thenReturn("https://224.0.0.1/"); + JiraRestClient jiraRestClient = new JiraRestClient(restTemplate, authConfig); + assertThrows(BadRequestException.class, () -> jiraRestClient.getIssue("TEST-1")); + } + } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java index 51694404b2..615237e8e5 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java @@ -21,7 +21,7 @@ public class LeaderScheduler implements Runnable { /** * Default duration to extend the timeout of lease */ - private static final int DEFAULT_EXTEND_LEASE_MINUTES = 3; + private static final Duration DEFAULT_EXTEND_LEASE_MINUTES = Duration.ofMinutes(3); /** * Default interval to run lease check and shard discovery @@ -68,7 +68,7 @@ public void run() { Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); leaderProgressState.setLastPollTime(updatedPollTime); leaderPartition.setLeaderProgressState(leaderProgressState); - coordinator.saveProgressStateForPartition(leaderPartition, null); + coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); } } catch (Exception e) { @@ -78,7 +78,7 @@ public void run() { // Extend the timeout // will always be a leader until shutdown try { - coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)); + coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); } catch (final Exception e) { LOG.error("Failed to save Leader partition state. This process will retry."); } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java index 8aa3bfab16..6390008b21 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java @@ -67,7 +67,7 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte // Check if crawler was invoked and updated leader lease renewal time verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator); - verify(coordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java index 474355fd28..8fee799b84 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java @@ -120,7 +120,6 @@ void testExceptionWhileAcquiringWorkerPartition() throws InterruptedException { @Test void testWhenNoPartitionToWorkOn() throws InterruptedException { WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler); - given(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)).willReturn(Optional.empty()); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(workerScheduler);