Skip to content

Commit

Permalink
SaaS Crawler Module (opensearch-project#5095)
Browse files Browse the repository at this point in the history
* Introducing SaaS sources gradle module and SaaS crawler as a common module for all of the gradle sources

Signed-off-by: Santhosh Gandhe <[email protected]>

* test classes

Signed-off-by: Santhosh Gandhe <[email protected]>

* Plain empty Jira Source plugin

Signed-off-by: Santhosh Gandhe <[email protected]>

* Parition Factory Tests

Signed-off-by: Santhosh Gandhe <[email protected]>

* additional tests

Signed-off-by: Santhosh Gandhe <[email protected]>

* additional tests

Signed-off-by: Santhosh Gandhe <[email protected]>

* full test coverage for base folder, spotless fixes

Signed-off-by: Maxwell Brown <[email protected]>

* additional tests

Signed-off-by: Santhosh Gandhe <[email protected]>

* additional test coverage

Signed-off-by: Santhosh Gandhe <[email protected]>

* addressing review comments and also package name refactoring based on the review input

Signed-off-by: Santhosh Gandhe <[email protected]>

* more review comments

Signed-off-by: Santhosh Gandhe <[email protected]>

* adjusted the log level and removed unwanted log messages

Signed-off-by: Santhosh Gandhe <[email protected]>

* small clean ups

Signed-off-by: Santhosh Gandhe <[email protected]>

* test case assertion fix

Signed-off-by: Santhosh Gandhe <[email protected]>

* better coverage

Signed-off-by: Santhosh Gandhe <[email protected]>

* step down the log level based on the review comments

Signed-off-by: Santhosh Gandhe <[email protected]>

* taking the coverage to 100%

Signed-off-by: Santhosh Gandhe <[email protected]>

* addressing review comments

Signed-off-by: Santhosh Gandhe <[email protected]>

* module name renamed to source-crawler

Signed-off-by: Santhosh Gandhe <[email protected]>

converting last_poll_time to java Instant type

Signed-off-by: Santhosh Gandhe <[email protected]>

we are now capturing Crawling times

Signed-off-by: Santhosh Gandhe <[email protected]>

ItemInfo long timestamp is now using Instant type

Signed-off-by: Santhosh Gandhe <[email protected]>

addressing review comments

Signed-off-by: Santhosh Gandhe <[email protected]>

Instant conversion

Signed-off-by: Santhosh Gandhe <[email protected]>

addressing review comments

Signed-off-by: Santhosh Gandhe <[email protected]>

code formatting

Signed-off-by: Santhosh Gandhe <[email protected]>

removed long polling by enabling setter on the leader scheduler timer

Signed-off-by: Santhosh Gandhe <[email protected]>

reducing wait times

Signed-off-by: Santhosh Gandhe <[email protected]>

---------

Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Maxwell Brown <[email protected]>
Co-authored-by: Maxwell Brown <[email protected]>
  • Loading branch information
san81 and Galactus22625 authored Oct 28, 2024
1 parent 37c8326 commit 675864d
Show file tree
Hide file tree
Showing 34 changed files with 1,903 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml

# Python virtual environments
.venv

# output folder created when we run test cases
**/out/
2 changes: 1 addition & 1 deletion data-prepper-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ subprojects {
}

dependencies {
subprojects.forEach { api project(':data-prepper-plugins:' + it.name) }
subprojects.findAll { api project(it.path) }
}
13 changes: 13 additions & 0 deletions data-prepper-plugins/saas-source-plugins/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
plugins {
id 'java-library'
}


subprojects {
apply plugin: 'data-prepper.publish'
group = 'org.opensearch.dataprepper.plugins.source'
}

dependencies {
subprojects.forEach { api project(':data-prepper-plugins::saas-source-plugins:' + it.name) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

# Metrics

### Counter
- `issuesRequested`: measures total number of issue Requests sent.

### Timer
- `requestProcessDuration`: measures latency of requests processed by the jira source plugin.
30 changes: 30 additions & 0 deletions data-prepper-plugins/saas-source-plugins/jira-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
plugins {
id 'java'
}

dependencies {

implementation project(path: ':data-prepper-plugins:saas-source-plugins:source-crawler')
implementation project(path: ':data-prepper-api')
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:common')

implementation libs.commons.io
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'javax.inject:javax.inject:1'
implementation("org.springframework:spring-web:${libs.versions.spring.get()}")

implementation 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'

implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
}
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.opensearch.dataprepper.plugins.source.saas.jira;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasSourceConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.time.Instant;
import java.util.Iterator;

/**
* This class represents a Jira client.
*/
@Named
public class JiraClient implements CrawlerClient {

private static final Logger log = LoggerFactory.getLogger(JiraClient.class);
private Instant lastPollTime;

public JiraClient() {
}


@Override
public Iterator<ItemInfo> listItems() {
return null;
}

@Override
public void setLastPollTime(Instant lastPollTime) {
log.info("Setting the lastPollTime: {}", lastPollTime);
this.lastPollTime = lastPollTime;
}

@Override
public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, SaasSourceConfig configuration) {
log.info("Logic for executing the partitions");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.opensearch.dataprepper.plugins.source.saas.jira;


import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.source.source_crawler.SaasCrawlerApplicationContextMarker;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasPluginExecutorServiceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* JiraConnector connector entry point.
*/

@DataPrepperPlugin(name = "jira",
pluginType = Source.class,
packagesToScan = {SaasCrawlerApplicationContextMarker.class, JiraSource.class}
)
public class JiraSource implements Source<Record<Event>> {

private static final Logger log = LoggerFactory.getLogger(JiraSource.class);


@DataPrepperPluginConstructor
public JiraSource(final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
final AcknowledgementSetManager acknowledgementSetManager,
Crawler crawler,
SaasPluginExecutorServiceProvider executorServiceProvider) {
log.info("Create Jira Source Connector");
}

public void start(Buffer<Record<Event>> buffer) {
log.info("Starting Jira Source Plugin... ");
}

@Override
public void stop() {

}

@Override
public ByteDecoder getDecoder() {
return Source.super.getDecoder();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
group = 'org.opensearch.dataprepper.plugins.source.source_crawler'

tasks.withType(Javadoc).configureEach {
enabled = false
}

dependencies {

implementation project(path: ':data-prepper-api')

implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
implementation 'org.projectlombok:lombok:1.18.30'
implementation 'javax.inject:javax.inject:1'
implementation 'javax.annotation:javax.annotation-api:1.3.2'

implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
}

testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
annotationProcessor 'org.projectlombok:lombok:1.18.30'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.dataprepper.plugins.source.source_crawler;

/**
* Market interface to indicate the base package to scan for dependency injection
*/
public interface SaasCrawlerApplicationContextMarker {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Named
public class Crawler {
private static final Logger log = LoggerFactory.getLogger(Crawler.class);
private static final int maxItemsPerPage = 20;
private final Timer crawlingTime;
private final PluginMetrics pluginMetrics =
PluginMetrics.fromNames("sourceCrawler", "crawler");

private final CrawlerClient client;

public Crawler(CrawlerClient client) {
this.client = client;
this.crawlingTime = pluginMetrics.timer("crawlingTime");
}

public Instant crawl(Instant lastPollTime,
EnhancedSourceCoordinator coordinator) {
long startTime = System.currentTimeMillis();
client.setLastPollTime(lastPollTime);
Iterator<ItemInfo> itemInfoIterator = client.listItems();
log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime);
Instant updatedPollTime = Instant.ofEpochMilli(0);
do {
final List<ItemInfo> itemInfoList = new ArrayList<>();
for (int i = 0; i < maxItemsPerPage && itemInfoIterator.hasNext(); i++) {
ItemInfo nextItem = itemInfoIterator.next();
if (nextItem == null) {
//we don't expect null items, but just in case, we'll skip them
log.info("Unexpected encounter of a null item.");
continue;
}
itemInfoList.add(nextItem);
Instant lastModifiedTime = nextItem.getLastModifiedAt();
updatedPollTime = updatedPollTime.isAfter(lastModifiedTime) ? updatedPollTime : lastModifiedTime;
}
createPartition(itemInfoList, coordinator);
} while (itemInfoIterator.hasNext());
long crawlTimeMillis = System.currentTimeMillis() - startTime;
log.debug("Crawling completed in {} ms", crawlTimeMillis);
crawlingTime.record(crawlTimeMillis, TimeUnit.MILLISECONDS);
return updatedPollTime;
}

public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, SaasSourceConfig sourceConfig) {
client.executePartition(state, buffer, sourceConfig);
}

private void createPartition(List<ItemInfo> itemInfoList, EnhancedSourceCoordinator coordinator) {
if (itemInfoList.isEmpty()) {
return;
}
ItemInfo itemInfo = itemInfoList.get(0);
String partitionKey = itemInfo.getPartitionKey();
List<String> itemIds = itemInfoList.stream().map(ItemInfo::getId).collect(Collectors.toList());
SaasWorkerProgressState state = new SaasWorkerProgressState();
state.setKeyAttributes(itemInfo.getKeyAttributes());
state.setItemIds(itemIds);
state.setExportStartTime(Instant.now());
state.setLoadedItems(itemInfoList.size());
SaasSourcePartition sourcePartition = new SaasSourcePartition(state, partitionKey);
coordinator.createPartition(sourcePartition);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;

import java.time.Instant;
import java.util.Iterator;

/**
* Interface for Crawler client. This interface can be implemented by different saas clients.
* For example, Jira, Salesforce, ServiceNow, etc.
*/
public interface CrawlerClient {


/**
* This will be the main API called by crawler. This method assumes that {@link
* SaasSourceConfig} is available as a member to {@link CrawlerClient}, as a result of
* which, other scanning properties will also be available to this method
*
* @return returns an {@link Iterator} of {@link ItemInfo}
*/
Iterator<ItemInfo> listItems();


/**
* Method to set the last time we polled the service to check for any changes.
*
* @param lastPollTime time in milliseconds
*/
void setLastPollTime(Instant lastPollTime);

/**
* Method for executing a particular partition or a chunk of work
*
* @param state worker node state holds the details of this particular chunk of work
* @param buffer pipeline buffer to write the results into
* @param sourceConfig pipeline configuration from the yaml
*/
void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, SaasSourceConfig sourceConfig);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PreDestroy;
import javax.inject.Named;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Named
public class SaasPluginExecutorServiceProvider {
private static final Logger log = LoggerFactory.getLogger(SaasPluginExecutorServiceProvider.class);
private static final int DEFAULT_THREAD_COUNT = 50;
private final ExecutorService executorService;

public SaasPluginExecutorServiceProvider() {
executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT);
}

/**
* Constructor for testing
*/
public SaasPluginExecutorServiceProvider(ExecutorService testExecutorService) {
executorService = testExecutorService;
}

public ExecutorService get() {
return executorService;
}

@PreDestroy
public void terminateExecutor() {
try {
log.debug("Shutting down ExecutorService " + executorService);
executorService.shutdown();
boolean isExecutorTerminated = executorService
.awaitTermination(30, TimeUnit.SECONDS);
log.debug("ExecutorService terminated : " + isExecutorTerminated);
} catch (InterruptedException e) {
log.error("Interrupted while terminating executor : " + e.getMessage());
Thread.currentThread().interrupt();
} finally {
executorService.shutdownNow();
}
}
}
Loading

0 comments on commit 675864d

Please sign in to comment.