Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some fixes to jira source #5203

Merged
merged 8 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,6 @@ public JiraItemInfoBuilder withProject(String project) {
return this;
}

public JiraItemInfoBuilder withIssueType(String issueType) {
this.issueType = issueType;
return this;
}

public JiraItemInfoBuilder withIssueBean(IssueBean issue) {
Map<String, Object> issueMetadata = new HashMap<>();
issueMetadata.put(PROJECT_KEY, issue.getProject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@

import javax.inject.Named;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

@Named
public class JiraIterator implements Iterator<ItemInfo> {

private static final int HAS_NEXT_TIMEOUT = 60;
private static final Logger log = LoggerFactory.getLogger(JiraIterator.class);
private final JiraSourceConfig sourceConfig;
private final JiraService service;
Expand All @@ -27,6 +31,7 @@ public class JiraIterator implements Iterator<ItemInfo> {
private Queue<ItemInfo> itemInfoQueue;
private Instant lastPollTime;
private boolean firstTime = true;
private List<Future<Boolean>> futureList = new ArrayList<>();

public JiraIterator(final JiraService service,
PluginExecutorServiceProvider executorServiceProvider,
Expand All @@ -40,12 +45,41 @@ public JiraIterator(final JiraService service,
public boolean hasNext() {
if (firstTime) {
log.trace("Crawling has been started");
itemInfoQueue = service.getJiraEntities(sourceConfig, lastPollTime);
startCrawlerThreads();
firstTime = false;
}
int timeout = HAS_NEXT_TIMEOUT;
while (isCrawlerRunning() && itemInfoQueue.isEmpty() && timeout > 0) {
try {
log.trace("Waiting for crawler queue to be filled for next {} seconds", timeout);
Thread.sleep(crawlerQWaitTimeMillis);
timeout--;
} catch (InterruptedException e) {
log.error("An exception has occurred while checking for the next document in crawling queue");
Thread.currentThread().interrupt();
}
}
return !this.itemInfoQueue.isEmpty();
}

private boolean isCrawlerRunning() {
boolean isRunning = false;
if (!futureList.isEmpty()) {
for (Future<Boolean> future : futureList) {
if (!future.isDone()) {
isRunning = true;
break;
}
}
}
return isRunning;
}

private void startCrawlerThreads() {
futureList.add(crawlerTaskExecutor.submit(() ->
service.getJiraEntities(sourceConfig, lastPollTime, itemInfoQueue), false));
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes look good but I do not see corresponding changes to the JavaIteratorTest.java file. When using futures testing is very important as we realized with Lambda code. Please make sure all these changes have corresponding tests

@Override
public ItemInfo next() {
if (hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.ISSUE_KEY;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.PROJECT_KEY;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.UPDATED;
import static org.opensearch.dataprepper.plugins.source.jira.utils.JqlConstants.CLOSING_ROUND_BRACKET;
import static org.opensearch.dataprepper.plugins.source.jira.utils.JqlConstants.DELIMITER;
Expand Down Expand Up @@ -65,12 +61,10 @@ public JiraService(JiraSourceConfig jiraSourceConfig, JiraRestClient jiraRestCli
* @param configuration the configuration.
* @param timestamp timestamp.
*/
public Queue<ItemInfo> getJiraEntities(JiraSourceConfig configuration, Instant timestamp) {
public void getJiraEntities(JiraSourceConfig configuration, Instant timestamp, Queue<ItemInfo> itemInfoQueue) {
log.trace("Started to fetch entities");
Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();
searchForNewTicketsAndAddToQueue(configuration, timestamp, itemInfoQueue);
log.trace("Creating item information and adding in queue");
return itemInfoQueue;
}

public String getIssue(String issueKey) {
Expand Down Expand Up @@ -170,21 +164,4 @@ private void validateProjectFilters(JiraSourceConfig configuration) {
}
}

/**
* Method for creating Item Info.
*
* @param key Input Parameter
* @param metadata Input Parameter
* @return Item Info
*/
private ItemInfo createItemInfo(String key, Map<String, Object> metadata) {
return JiraItemInfo.builder().withEventTime(Instant.now())
.withId((String) metadata.get(ISSUE_KEY))
.withItemId(key)
.withMetadata(metadata)
.withProject((String) metadata.get(PROJECT_KEY))
.withIssueType((String) metadata.get(CONTENT_TYPE))
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import lombok.extern.slf4j.Slf4j;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.jira.JiraSourceConfig;
import org.opensearch.dataprepper.plugins.source.jira.exception.BadRequestException;
import org.opensearch.dataprepper.plugins.source.jira.exception.UnAuthorizedException;
import org.opensearch.dataprepper.plugins.source.jira.models.SearchResults;
import org.opensearch.dataprepper.plugins.source.jira.rest.auth.JiraAuthConfig;
import org.opensearch.dataprepper.plugins.source.jira.utils.AddressValidation;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.HttpClientErrorException;
Expand Down Expand Up @@ -93,8 +95,8 @@ public String getIssue(String issueKey) {
return invokeRestApi(uri, String.class).getBody();
}

private <T> ResponseEntity<T> invokeRestApi(URI uri, Class<T> responseType) {

private <T> ResponseEntity<T> invokeRestApi(URI uri, Class<T> responseType) throws BadRequestException{
AddressValidation.validateInetAddress(AddressValidation.getInetAddress(uri.toString()));
int retryCount = 0;
while (retryCount < RETRY_ATTEMPT) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -141,7 +142,8 @@ public void testGetJiraEntities() throws JsonProcessingException {
doReturn(mockSearchResults).when(jiraRestClient).getAllIssues(any(StringBuilder.class), anyInt(), any(JiraSourceConfig.class));

Instant timestamp = Instant.ofEpochSecond(0);
Queue<ItemInfo> itemInfoQueue = jiraService.getJiraEntities(jiraSourceConfig, timestamp);
Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();
jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue);
assertEquals(mockIssues.size(), itemInfoQueue.size());
}

Expand All @@ -166,7 +168,8 @@ public void buildIssueItemInfoMultipleFutureThreads() throws JsonProcessingExcep
doReturn(mockSearchResults).when(jiraRestClient).getAllIssues(any(StringBuilder.class), anyInt(), any(JiraSourceConfig.class));

Instant timestamp = Instant.ofEpochSecond(0);
Queue<ItemInfo> itemInfoQueue = jiraService.getJiraEntities(jiraSourceConfig, timestamp);
Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();
jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue);
assertTrue(itemInfoQueue.size() >= 100);
}

Expand All @@ -186,7 +189,9 @@ public void testBadProjectKeys() throws JsonProcessingException {
JiraService jiraService = new JiraService(jiraSourceConfig, jiraRestClient);

Instant timestamp = Instant.ofEpochSecond(0);
assertThrows(BadRequestException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp));
Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();

assertThrows(BadRequestException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue));
}

@Test
Expand All @@ -201,7 +206,9 @@ public void testGetJiraEntitiesException() throws JsonProcessingException {
doThrow(RuntimeException.class).when(jiraRestClient).getAllIssues(any(StringBuilder.class), anyInt(), any(JiraSourceConfig.class));

Instant timestamp = Instant.ofEpochSecond(0);
assertThrows(RuntimeException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp));
Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();

assertThrows(RuntimeException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testGetAllIssuesOauth2() throws JsonProcessingException {
JiraSourceConfig jiraSourceConfig = JiraServiceTest.createJiraConfiguration(OAUTH2, issueType, issueStatus, projectKey);
JiraRestClient jiraRestClient = new JiraRestClient(restTemplate, authConfig);
SearchResults mockSearchResults = mock(SearchResults.class);
doReturn("http://mock-service.jira.com").when(authConfig).getUrl();
doReturn("http://mock-service.jira.com/").when(authConfig).getUrl();
doReturn(new ResponseEntity<>(mockSearchResults, HttpStatus.OK)).when(restTemplate).getForEntity(any(URI.class), any(Class.class));
SearchResults results = jiraRestClient.getAllIssues(jql, 0, jiraSourceConfig);
assertNotNull(results);
Expand All @@ -123,7 +123,7 @@ public void testGetAllIssuesBasic() throws JsonProcessingException {
JiraSourceConfig jiraSourceConfig = JiraServiceTest.createJiraConfiguration(BASIC, issueType, issueStatus, projectKey);
JiraRestClient jiraRestClient = new JiraRestClient(restTemplate, authConfig);
SearchResults mockSearchResults = mock(SearchResults.class);
when(authConfig.getUrl()).thenReturn("https://example.com");
when(authConfig.getUrl()).thenReturn("https://example.com/");
doReturn(new ResponseEntity<>(mockSearchResults, HttpStatus.OK)).when(restTemplate).getForEntity(any(URI.class), any(Class.class));
SearchResults results = jiraRestClient.getAllIssues(jql, 0, jiraSourceConfig);
assertNotNull(results);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class LeaderScheduler implements Runnable {
/**
* Default duration to extend the timeout of lease
*/
private static final int DEFAULT_EXTEND_LEASE_MINUTES = 3;
private static final Duration DEFAULT_EXTEND_LEASE_MINUTES = Duration.ofMinutes(3);

/**
* Default interval to run lease check and shard discovery
Expand Down Expand Up @@ -68,7 +68,7 @@ public void run() {
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
leaderProgressState.setLastPollTime(updatedPollTime);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, null);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
}

} catch (Exception e) {
Expand All @@ -78,7 +78,7 @@ public void run() {
// Extend the timeout
// will always be a leader until shutdown
try {
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
} catch (final Exception e) {
LOG.error("Failed to save Leader partition state. This process will retry.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator);
verify(coordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ void testExceptionWhileAcquiringWorkerPartition() throws InterruptedException {
@Test
void testWhenNoPartitionToWorkOn() throws InterruptedException {
WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler);
given(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)).willReturn(Optional.empty());

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(workerScheduler);
Expand Down
Loading