From 675864d120e8f88deeee2b341254353c827e06de Mon Sep 17 00:00:00 2001 From: Santhosh Gandhe <1909520+san81@users.noreply.github.com> Date: Mon, 28 Oct 2024 10:36:49 -0700 Subject: [PATCH] SaaS Crawler Module (#5095) * Introducing SaaS sources gradle module and SaaS crawler as a common module for all of the gradle sources Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * test classes Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * Plain empty Jira Source plugin Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * Parition Factory Tests Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * additional tests Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * additional tests Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * full test coverage for base folder, spotless fixes Signed-off-by: Maxwell Brown * additional tests Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * additional test coverage Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * addressing review comments and also package name refactoring based on the review input Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * more review comments Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * adjusted the log level and removed unwanted log messages Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * small clean ups Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * test case assertion fix Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * better coverage Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * step down the log level based on the review comments Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * taking the coverage to 100% Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * addressing review comments Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * module name renamed to source-crawler Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> converting last_poll_time to java Instant type Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> we are now capturing Crawling times Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> ItemInfo long timestamp is now using Instant type Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> addressing review comments Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> Instant conversion Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> addressing review comments Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> code formatting Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> removed long polling by enabling setter on the leader scheduler timer Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> reducing wait times Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> --------- Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> Signed-off-by: Maxwell Brown Co-authored-by: Maxwell Brown --- .gitignore | 3 + data-prepper-plugins/build.gradle | 2 +- .../saas-source-plugins/build.gradle | 13 ++ .../saas-source-plugins/jira-source/README.md | 8 + .../jira-source/build.gradle | 30 ++++ .../plugins/source/saas/jira/JiraClient.java | 45 +++++ .../plugins/source/saas/jira/JiraSource.java | 57 ++++++ .../source-crawler/build.gradle | 26 +++ .../SaasCrawlerApplicationContextMarker.java | 7 + .../source/source_crawler/base/Crawler.java | 86 ++++++++++ .../source_crawler/base/CrawlerClient.java | 44 +++++ .../SaasPluginExecutorServiceProvider.java | 48 ++++++ .../source_crawler/base/SaasSourceConfig.java | 9 + .../source_crawler/base/SaasSourcePlugin.java | 110 ++++++++++++ .../coordination/PartitionFactory.java | 40 +++++ .../partition/LeaderPartition.java | 82 +++++++++ .../partition/SaasSourcePartition.java | 55 ++++++ .../scheduler/LeaderScheduler.java | 101 +++++++++++ .../scheduler/WorkerScheduler.java | 87 ++++++++++ .../state/LeaderProgressState.java | 26 +++ .../state/SaasWorkerProgressState.java | 38 ++++ .../source/source_crawler/model/ItemInfo.java | 53 ++++++ .../util/CustomInstantDeserializer.java | 16 ++ .../source_crawler/base/CrawlerTest.java | 162 ++++++++++++++++++ ...SaasPluginExecutorServiceProviderTest.java | 69 ++++++++ .../base/SaasSourcePluginTest.java | 138 +++++++++++++++ .../coordination/PartitionFactoryTest.java | 104 +++++++++++ .../scheduler/LeaderSchedulerTest.java | 115 +++++++++++++ .../scheduler/WorkerSchedulerTest.java | 151 ++++++++++++++++ .../state/LeaderProgressStateTest.java | 34 ++++ .../state/SaasWorkerProgressStateTest.java | 44 +++++ .../source_crawler/model/ItemInfoTest.java | 41 +++++ .../source_crawler/model/TestItemInfo.java | 56 ++++++ settings.gradle | 4 + 34 files changed, 1903 insertions(+), 1 deletion(-) create mode 100644 data-prepper-plugins/saas-source-plugins/build.gradle create mode 100644 data-prepper-plugins/saas-source-plugins/jira-source/README.md create mode 100644 data-prepper-plugins/saas-source-plugins/jira-source/build.gradle create mode 100644 data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/saas/jira/JiraClient.java create mode 100644 data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/saas/jira/JiraSource.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/build.gradle create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/SaasCrawlerApplicationContextMarker.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerClient.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasPluginExecutorServiceProvider.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourceConfig.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourcePlugin.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/PartitionFactory.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/SaasSourcePartition.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/LeaderProgressState.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/SaasWorkerProgressState.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/ItemInfo.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/util/CustomInstantDeserializer.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasPluginExecutorServiceProviderTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourcePluginTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/PartitionFactoryTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/LeaderProgressStateTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/SaasWorkerProgressStateTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/ItemInfoTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/TestItemInfo.java diff --git a/.gitignore b/.gitignore index 6bf57412d2..4247753be3 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,6 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml # Python virtual environments .venv + +# output folder created when we run test cases +**/out/ diff --git a/data-prepper-plugins/build.gradle b/data-prepper-plugins/build.gradle index 6e60c970cc..2ad38c2788 100644 --- a/data-prepper-plugins/build.gradle +++ b/data-prepper-plugins/build.gradle @@ -13,5 +13,5 @@ subprojects { } dependencies { - subprojects.forEach { api project(':data-prepper-plugins:' + it.name) } + subprojects.findAll { api project(it.path) } } diff --git a/data-prepper-plugins/saas-source-plugins/build.gradle b/data-prepper-plugins/saas-source-plugins/build.gradle new file mode 100644 index 0000000000..635d6a3cad --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/build.gradle @@ -0,0 +1,13 @@ +plugins { + id 'java-library' +} + + +subprojects { + apply plugin: 'data-prepper.publish' + group = 'org.opensearch.dataprepper.plugins.source' +} + +dependencies { + subprojects.forEach { api project(':data-prepper-plugins::saas-source-plugins:' + it.name) } +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/README.md b/data-prepper-plugins/saas-source-plugins/jira-source/README.md new file mode 100644 index 0000000000..f2a1148a2e --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/jira-source/README.md @@ -0,0 +1,8 @@ + +# Metrics + +### Counter +- `issuesRequested`: measures total number of issue Requests sent. + +### Timer +- `requestProcessDuration`: measures latency of requests processed by the jira source plugin. diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/build.gradle b/data-prepper-plugins/saas-source-plugins/jira-source/build.gradle new file mode 100644 index 0000000000..9979e0151d --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/jira-source/build.gradle @@ -0,0 +1,30 @@ +plugins { + id 'java' +} + +dependencies { + + implementation project(path: ':data-prepper-plugins:saas-source-plugins:source-crawler') + implementation project(path: ':data-prepper-api') + implementation project(path: ':data-prepper-plugins:aws-plugin-api') + implementation project(path: ':data-prepper-plugins:buffer-common') + implementation project(path: ':data-prepper-plugins:common') + + implementation libs.commons.io + implementation 'io.micrometer:micrometer-core' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'javax.inject:javax.inject:1' + implementation("org.springframework:spring-web:${libs.versions.spring.get()}") + + implementation 'org.projectlombok:lombok:1.18.30' + annotationProcessor 'org.projectlombok:lombok:1.18.30' + + implementation(libs.spring.context) { + exclude group: 'commons-logging', module: 'commons-logging' + } +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/saas/jira/JiraClient.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/saas/jira/JiraClient.java new file mode 100644 index 0000000000..e2cea69e29 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/saas/jira/JiraClient.java @@ -0,0 +1,45 @@ +package org.opensearch.dataprepper.plugins.source.saas.jira; + +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasSourceConfig; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Named; +import java.time.Instant; +import java.util.Iterator; + +/** + * This class represents a Jira client. + */ +@Named +public class JiraClient implements CrawlerClient { + + private static final Logger log = LoggerFactory.getLogger(JiraClient.class); + private Instant lastPollTime; + + public JiraClient() { + } + + + @Override + public Iterator listItems() { + return null; + } + + @Override + public void setLastPollTime(Instant lastPollTime) { + log.info("Setting the lastPollTime: {}", lastPollTime); + this.lastPollTime = lastPollTime; + } + + @Override + public void executePartition(SaasWorkerProgressState state, Buffer> buffer, SaasSourceConfig configuration) { + log.info("Logic for executing the partitions"); + } +} diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/saas/jira/JiraSource.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/saas/jira/JiraSource.java new file mode 100644 index 0000000000..e5841d7ede --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/saas/jira/JiraSource.java @@ -0,0 +1,57 @@ +package org.opensearch.dataprepper.plugins.source.saas.jira; + + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.plugins.source.source_crawler.SaasCrawlerApplicationContextMarker; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasPluginExecutorServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * JiraConnector connector entry point. + */ + +@DataPrepperPlugin(name = "jira", + pluginType = Source.class, + packagesToScan = {SaasCrawlerApplicationContextMarker.class, JiraSource.class} +) +public class JiraSource implements Source> { + + private static final Logger log = LoggerFactory.getLogger(JiraSource.class); + + + @DataPrepperPluginConstructor + public JiraSource(final PluginMetrics pluginMetrics, + final PluginFactory pluginFactory, + final AcknowledgementSetManager acknowledgementSetManager, + Crawler crawler, + SaasPluginExecutorServiceProvider executorServiceProvider) { + log.info("Create Jira Source Connector"); + } + + public void start(Buffer> buffer) { + log.info("Starting Jira Source Plugin... "); + } + + @Override + public void stop() { + + } + + @Override + public ByteDecoder getDecoder() { + return Source.super.getDecoder(); + } + +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/build.gradle b/data-prepper-plugins/saas-source-plugins/source-crawler/build.gradle new file mode 100644 index 0000000000..6e46708c7e --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/build.gradle @@ -0,0 +1,26 @@ +group = 'org.opensearch.dataprepper.plugins.source.source_crawler' + +tasks.withType(Javadoc).configureEach { + enabled = false +} + +dependencies { + + implementation project(path: ':data-prepper-api') + + implementation 'io.micrometer:micrometer-core' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8' + implementation 'org.projectlombok:lombok:1.18.30' + implementation 'javax.inject:javax.inject:1' + implementation 'javax.annotation:javax.annotation-api:1.3.2' + + implementation(libs.spring.context) { + exclude group: 'commons-logging', module: 'commons-logging' + } + + testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' + annotationProcessor 'org.projectlombok:lombok:1.18.30' +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/SaasCrawlerApplicationContextMarker.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/SaasCrawlerApplicationContextMarker.java new file mode 100644 index 0000000000..cbfafd7738 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/SaasCrawlerApplicationContextMarker.java @@ -0,0 +1,7 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler; + +/** + * Market interface to indicate the base package to scan for dependency injection + */ +public interface SaasCrawlerApplicationContextMarker { +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java new file mode 100644 index 0000000000..1e39aa085b --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java @@ -0,0 +1,86 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import io.micrometer.core.instrument.Timer; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Named; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Named +public class Crawler { + private static final Logger log = LoggerFactory.getLogger(Crawler.class); + private static final int maxItemsPerPage = 20; + private final Timer crawlingTime; + private final PluginMetrics pluginMetrics = + PluginMetrics.fromNames("sourceCrawler", "crawler"); + + private final CrawlerClient client; + + public Crawler(CrawlerClient client) { + this.client = client; + this.crawlingTime = pluginMetrics.timer("crawlingTime"); + } + + public Instant crawl(Instant lastPollTime, + EnhancedSourceCoordinator coordinator) { + long startTime = System.currentTimeMillis(); + client.setLastPollTime(lastPollTime); + Iterator itemInfoIterator = client.listItems(); + log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime); + Instant updatedPollTime = Instant.ofEpochMilli(0); + do { + final List itemInfoList = new ArrayList<>(); + for (int i = 0; i < maxItemsPerPage && itemInfoIterator.hasNext(); i++) { + ItemInfo nextItem = itemInfoIterator.next(); + if (nextItem == null) { + //we don't expect null items, but just in case, we'll skip them + log.info("Unexpected encounter of a null item."); + continue; + } + itemInfoList.add(nextItem); + Instant lastModifiedTime = nextItem.getLastModifiedAt(); + updatedPollTime = updatedPollTime.isAfter(lastModifiedTime) ? updatedPollTime : lastModifiedTime; + } + createPartition(itemInfoList, coordinator); + } while (itemInfoIterator.hasNext()); + long crawlTimeMillis = System.currentTimeMillis() - startTime; + log.debug("Crawling completed in {} ms", crawlTimeMillis); + crawlingTime.record(crawlTimeMillis, TimeUnit.MILLISECONDS); + return updatedPollTime; + } + + public void executePartition(SaasWorkerProgressState state, Buffer> buffer, SaasSourceConfig sourceConfig) { + client.executePartition(state, buffer, sourceConfig); + } + + private void createPartition(List itemInfoList, EnhancedSourceCoordinator coordinator) { + if (itemInfoList.isEmpty()) { + return; + } + ItemInfo itemInfo = itemInfoList.get(0); + String partitionKey = itemInfo.getPartitionKey(); + List itemIds = itemInfoList.stream().map(ItemInfo::getId).collect(Collectors.toList()); + SaasWorkerProgressState state = new SaasWorkerProgressState(); + state.setKeyAttributes(itemInfo.getKeyAttributes()); + state.setItemIds(itemIds); + state.setExportStartTime(Instant.now()); + state.setLoadedItems(itemInfoList.size()); + SaasSourcePartition sourcePartition = new SaasSourcePartition(state, partitionKey); + coordinator.createPartition(sourcePartition); + } + +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerClient.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerClient.java new file mode 100644 index 0000000000..8e284c5014 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerClient.java @@ -0,0 +1,44 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; + +import java.time.Instant; +import java.util.Iterator; + +/** + * Interface for Crawler client. This interface can be implemented by different saas clients. + * For example, Jira, Salesforce, ServiceNow, etc. + */ +public interface CrawlerClient { + + + /** + * This will be the main API called by crawler. This method assumes that {@link + * SaasSourceConfig} is available as a member to {@link CrawlerClient}, as a result of + * which, other scanning properties will also be available to this method + * + * @return returns an {@link Iterator} of {@link ItemInfo} + */ + Iterator listItems(); + + + /** + * Method to set the last time we polled the service to check for any changes. + * + * @param lastPollTime time in milliseconds + */ + void setLastPollTime(Instant lastPollTime); + + /** + * Method for executing a particular partition or a chunk of work + * + * @param state worker node state holds the details of this particular chunk of work + * @param buffer pipeline buffer to write the results into + * @param sourceConfig pipeline configuration from the yaml + */ + void executePartition(SaasWorkerProgressState state, Buffer> buffer, SaasSourceConfig sourceConfig); +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasPluginExecutorServiceProvider.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasPluginExecutorServiceProvider.java new file mode 100644 index 0000000000..84a587bb5a --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasPluginExecutorServiceProvider.java @@ -0,0 +1,48 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.PreDestroy; +import javax.inject.Named; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@Named +public class SaasPluginExecutorServiceProvider { + private static final Logger log = LoggerFactory.getLogger(SaasPluginExecutorServiceProvider.class); + private static final int DEFAULT_THREAD_COUNT = 50; + private final ExecutorService executorService; + + public SaasPluginExecutorServiceProvider() { + executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT); + } + + /** + * Constructor for testing + */ + public SaasPluginExecutorServiceProvider(ExecutorService testExecutorService) { + executorService = testExecutorService; + } + + public ExecutorService get() { + return executorService; + } + + @PreDestroy + public void terminateExecutor() { + try { + log.debug("Shutting down ExecutorService " + executorService); + executorService.shutdown(); + boolean isExecutorTerminated = executorService + .awaitTermination(30, TimeUnit.SECONDS); + log.debug("ExecutorService terminated : " + isExecutorTerminated); + } catch (InterruptedException e) { + log.error("Interrupted while terminating executor : " + e.getMessage()); + Thread.currentThread().interrupt(); + } finally { + executorService.shutdownNow(); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourceConfig.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourceConfig.java new file mode 100644 index 0000000000..bb6ab49ef2 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourceConfig.java @@ -0,0 +1,9 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +/** + * Marker interface to all the SAAS connectors configuration + */ +public interface SaasSourceConfig { + + int DEFAULT_NUMBER_OF_WORKERS = 1; +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourcePlugin.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourcePlugin.java new file mode 100644 index 0000000000..d6007989af --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourcePlugin.java @@ -0,0 +1,110 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.PartitionFactory; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; + + +/** + * JiraConnector connector entry point. + */ + +public abstract class SaasSourcePlugin implements Source>, UsesEnhancedSourceCoordination { + + + private static final Logger log = LoggerFactory.getLogger(SaasSourcePlugin.class); + private final PluginMetrics pluginMetrics; + private final PluginFactory pluginFactory; + + private final AcknowledgementSetManager acknowledgementSetManager; + + private final ExecutorService executorService; + private final SaasSourceConfig sourceConfig; + private final Crawler crawler; + private final String sourcePluginName; + private EnhancedSourceCoordinator coordinator; + private Buffer> buffer; + + + public SaasSourcePlugin(final String sourcePluginName, + final PluginMetrics pluginMetrics, + final SaasSourceConfig sourceConfig, + final PluginFactory pluginFactory, + final AcknowledgementSetManager acknowledgementSetManager, + final Crawler crawler, + final SaasPluginExecutorServiceProvider executorServiceProvider) { + log.debug("Creating {} Source Plugin", sourcePluginName); + this.sourcePluginName = sourcePluginName; + this.pluginMetrics = pluginMetrics; + this.sourceConfig = sourceConfig; + this.pluginFactory = pluginFactory; + this.crawler = crawler; + + this.acknowledgementSetManager = acknowledgementSetManager; + this.executorService = executorServiceProvider.get(); + } + + + @Override + public void start(Buffer> buffer) { + Objects.requireNonNull(coordinator); + log.info("Starting {} Source Plugin", sourcePluginName); + this.buffer = buffer; + + boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition()); + log.debug("Leader partition creation status: {}", isPartitionCreated); + + Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler); + this.executorService.submit(leaderScheduler); + //Register worker threaders + for (int i = 0; i < sourceConfig.DEFAULT_NUMBER_OF_WORKERS; i++) { + WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler); + this.executorService.submit(new Thread(workerScheduler)); + } + } + + + @Override + public void stop() { + log.info("Stop Source Connector"); + this.executorService.shutdownNow(); + } + + + @Override + public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordinator) { + coordinator = sourceCoordinator; + coordinator.initialize(); + } + + @Override + public Function getPartitionFactory() { + return new PartitionFactory(); + } + + @Override + public ByteDecoder getDecoder() { + return Source.super.getDecoder(); + } + +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/PartitionFactory.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/PartitionFactory.java new file mode 100644 index 0000000000..79ea38c4c2 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/PartitionFactory.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; + +import java.util.function.Function; + +/** + * Partition factory for SAAS source plugins. + */ +public class PartitionFactory implements Function { + + + @Override + public EnhancedSourcePartition apply(SourcePartitionStoreItem partitionStoreItem) { + String sourceIdentifier = partitionStoreItem.getSourceIdentifier(); + String partitionType = sourceIdentifier.substring(sourceIdentifier.lastIndexOf('|') + 1); + + if (LeaderPartition.PARTITION_TYPE.equals(partitionType)) { + return new LeaderPartition(partitionStoreItem); + } else if (SaasSourcePartition.PARTITION_TYPE.equals(partitionType)) { + return new SaasSourcePartition(partitionStoreItem); + } else { + // Unable to acquire other partitions. + // Probably we will introduce Global state in the future but for now, we don't expect to reach here. + throw new RuntimeException(String.format("Unable to acquire other partition : %s. " + + "Probably we will introduce Global state in the future but for now, " + + "we don't expect to reach here.", partitionType)); + } + } + + +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java new file mode 100644 index 0000000000..a54e50d36f --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java @@ -0,0 +1,82 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; + +/** + *

A LeaderPartition is for some tasks that should be done in a single node only.

+ *

Hence whatever node owns the lease of this partition will be acted as a 'leader'.

+ *

In this saas source design, a leader node will be responsible for:

+ *
    + *
  • Initialization process
  • + *
  • Crawl the source iteratively and create work for other worker nodes
  • + *
+ */ + +public class LeaderPartition extends EnhancedSourcePartition { + + public static final String PARTITION_TYPE = "LEADER"; + public static final String DEFAULT_PARTITION_KEY = "GLOBAL"; + private static final Logger LOG = LoggerFactory.getLogger(LeaderPartition.class); + private static final ObjectMapper objectMapper = new ObjectMapper(new JsonFactory()) + .registerModule(new JavaTimeModule()); + private final LeaderProgressState state; + + public LeaderPartition() { + this.state = new LeaderProgressState(Instant.ofEpochMilli(0)); + } + + public LeaderPartition(SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + this.state = convertToPartitionState(sourcePartitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { + return PARTITION_TYPE; + } + + @Override + public String getPartitionKey() { + return DEFAULT_PARTITION_KEY; + } + + @Override + public Optional getProgressState() { + return Optional.of(state); + } + + public void setLeaderProgressState(LeaderProgressState state) { + this.state.setLastPollTime(state.getLastPollTime()); + } + + /** + * Helper method to convert progress state. + * This is because the state is currently stored as a String in the coordination store. + * + * @param serializedPartitionProgressState serialized value of the partition progress state + * @return returns the converted value of the progress state + */ + public LeaderProgressState convertToPartitionState(final String serializedPartitionProgressState) { + if (Objects.isNull(serializedPartitionProgressState)) { + return null; + } + try { + return objectMapper.readValue(serializedPartitionProgressState, LeaderProgressState.class); + } catch (final JsonProcessingException e) { + LOG.error("Unable to convert string to partition progress state class ", e); + return null; + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/SaasSourcePartition.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/SaasSourcePartition.java new file mode 100644 index 0000000000..aa63a411b4 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/SaasSourcePartition.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; + +import java.util.Optional; + +/** + * An SAAS source partition represents a chunk of work. + * The source identifier contains keyword 'SAAS-WORKER' + */ +public class SaasSourcePartition extends EnhancedSourcePartition { + + public static final String PARTITION_TYPE = "SAAS-WORKER"; + private final SaasWorkerProgressState state; + private final String partitionKey; + + public SaasSourcePartition(final SourcePartitionStoreItem sourcePartitionStoreItem) { + this.partitionKey = sourcePartitionStoreItem.getSourcePartitionKey(); + + setSourcePartitionStoreItem(sourcePartitionStoreItem); + this.state = convertStringToPartitionProgressState(SaasWorkerProgressState.class, sourcePartitionStoreItem.getPartitionProgressState()); + } + + public SaasSourcePartition(final SaasWorkerProgressState state, + String partitionKey) { + this.state = state; + this.partitionKey = partitionKey; + } + + @Override + public String getPartitionType() { + return PARTITION_TYPE; + } + + @Override + public String getPartitionKey() { + return this.partitionKey; + } + + @Override + public Optional getProgressState() { + if (state != null) { + return Optional.of(state); + } + return Optional.empty(); + } + +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java new file mode 100644 index 0000000000..62803c6217 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java @@ -0,0 +1,101 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler; + +import lombok.Setter; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasSourcePlugin; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; + +public class LeaderScheduler implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class); + + /** + * Default duration to extend the timeout of lease + */ + private static final int DEFAULT_EXTEND_LEASE_MINUTES = 3; + + /** + * Default interval to run lease check and shard discovery + */ + private static final Duration DEFAULT_LEASE_INTERVAL = Duration.ofMinutes(1); + + private final EnhancedSourceCoordinator coordinator; + private final SaasSourcePlugin sourcePlugin; + private final Crawler crawler; + @Setter + private Duration leaseInterval; + private LeaderPartition leaderPartition; + + public LeaderScheduler(EnhancedSourceCoordinator coordinator, + SaasSourcePlugin sourcePlugin, + Crawler crawler) { + this.coordinator = coordinator; + this.leaseInterval = DEFAULT_LEASE_INTERVAL; + this.sourcePlugin = sourcePlugin; + this.crawler = crawler; + } + + @Override + public void run() { + LOG.debug("Starting Leader Scheduler for initialization and source partition discovery"); + + while (!Thread.currentThread().isInterrupted()) { + try { + // Try to acquire the lease if not owned. + if (leaderPartition == null) { + final Optional sourcePartition = coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE); + if (sourcePartition.isPresent()) { + LOG.info("Running as a LEADER node"); + leaderPartition = (LeaderPartition) sourcePartition.get(); + } + } + // Once owned, run Normal LEADER node process. + // May want to quit this scheduler if we don't want to monitor future changes + if (leaderPartition != null) { + LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); + Instant lastPollTime = leaderProgressState.getLastPollTime(); + + //Start crawling and create child partitions + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + leaderProgressState.setLastPollTime(updatedPollTime); + leaderPartition.setLeaderProgressState(leaderProgressState); + coordinator.saveProgressStateForPartition(leaderPartition, null); + } + + } catch (Exception e) { + LOG.error("Exception occurred in primary scheduling loop", e); + } finally { + if (leaderPartition != null) { + // Extend the timeout + // will always be a leader until shutdown + try { + coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)); + } catch (final Exception e) { + LOG.error("Failed to save Leader partition state. This process will retry."); + } + } + try { + Thread.sleep(leaseInterval.toMillis()); + } catch (final InterruptedException e) { + LOG.info("InterruptedException occurred"); + Thread.currentThread().interrupt(); + } + } + } + // Should Stop + LOG.warn("Quitting Leader Scheduler"); + if (leaderPartition != null) { + coordinator.giveUpPartition(leaderPartition); + } + } + +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java new file mode 100644 index 0000000000..6a94471854 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java @@ -0,0 +1,87 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler; + +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasSourceConfig; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Optional; + +/** + * Worker class for executing the partitioned work created while crawling a source. + * Each SAAS source will provide their own specific source extraction logic. + */ +public class WorkerScheduler implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(WorkerScheduler.class); + private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000; + private static final Duration DEFAULT_SLEEP_DURATION_MILLIS = Duration.ofMillis(10000); + + private final EnhancedSourceCoordinator sourceCoordinator; + private final SaasSourceConfig sourceConfig; + private final Crawler crawler; + private final Buffer> buffer; + + + public WorkerScheduler(Buffer> buffer, + EnhancedSourceCoordinator sourceCoordinator, + SaasSourceConfig sourceConfig, + Crawler crawler) { + this.sourceCoordinator = sourceCoordinator; + this.sourceConfig = sourceConfig; + this.crawler = crawler; + this.buffer = buffer; + } + + @Override + public void run() { + log.info("Worker thread started"); + log.info("Processing Partitions"); + while (!Thread.currentThread().isInterrupted()) { + try { + // Get the next available partition from the coordinator + Optional partition = + sourceCoordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE); + if (partition.isPresent()) { + // Process the partition (source extraction logic) + processPartition(partition.get(), buffer, sourceConfig); + + } else { + log.debug("No partition available. This thread will sleep for {}", DEFAULT_SLEEP_DURATION_MILLIS); + try { + Thread.sleep(DEFAULT_SLEEP_DURATION_MILLIS.toMillis()); + } catch (final InterruptedException e) { + log.info("InterruptedException occurred"); + break; + } + } + } catch (Exception e) { + log.error("Error processing partition", e); + try { + Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS); + } catch (InterruptedException ex) { + log.warn("Thread interrupted while waiting to retry", ex); + } + } + } + log.warn("SourceItemWorker Scheduler is interrupted, looks like shutdown has triggered"); + } + + private void processPartition(EnhancedSourcePartition partition, Buffer> buffer, SaasSourceConfig sourceConfig) { + // Implement your source extraction logic here + // Update the partition state or commit the partition as needed + // Commit the partition to mark it as processed + if (partition.getProgressState().isPresent()) { + crawler.executePartition((SaasWorkerProgressState) partition.getProgressState().get(), buffer, sourceConfig); + } + sourceCoordinator.completePartition(partition); + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/LeaderProgressState.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/LeaderProgressState.java new file mode 100644 index 0000000000..062753cc03 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/LeaderProgressState.java @@ -0,0 +1,26 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Getter; +import lombok.Setter; +import org.opensearch.dataprepper.plugins.source.source_crawler.util.CustomInstantDeserializer; + +import java.time.Instant; + +@Setter +@Getter +public class LeaderProgressState { + + @JsonProperty("initialized") + private boolean initialized = false; + + @JsonProperty("last_poll_time") + @JsonDeserialize(using = CustomInstantDeserializer.class) + private Instant lastPollTime; + + public LeaderProgressState(@JsonProperty("last_poll_time") final Instant lastPollTime) { + this.lastPollTime = lastPollTime; + } +} + diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/SaasWorkerProgressState.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/SaasWorkerProgressState.java new file mode 100644 index 0000000000..2dbfa28a67 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/SaasWorkerProgressState.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Getter; +import lombok.Setter; +import org.opensearch.dataprepper.plugins.source.source_crawler.util.CustomInstantDeserializer; + +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Getter +@Setter +public class SaasWorkerProgressState { + + @JsonProperty("totalItems") + private int totalItems; + + @JsonProperty("loadedItems") + private int loadedItems; + + @JsonProperty("exportStartTime") + @JsonDeserialize(using = CustomInstantDeserializer.class) + private Instant exportStartTime; + + private Map keyAttributes = new HashMap<>(); + + @JsonProperty("itemIds") + private List itemIds; + +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/ItemInfo.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/ItemInfo.java new file mode 100644 index 0000000000..48a063d64a --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/ItemInfo.java @@ -0,0 +1,53 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.model; + + +import java.time.Instant; +import java.util.Map; + + +public interface ItemInfo { + + /** + * Use this field to store primary item of a repository. Primary item of a repository is something + * which can be fetched/queried/obtained from source service just using its item ID. + */ + String getItemId(); + + /** + * Use this field to store items metadata. Item metadata can be any information other than item + * contents itself which can be used to apply regex filtering, change data capture etc. general + * assumption here is that fetching metadata should be faster than fetching entire Item + */ + Map getMetadata(); + + /** + * Process your change log events serially (preferably in a single thread) and ensure that you are + * applying monotonously increasing timeStamp. If you don't do that, then SDK could miss latest + * updates as it processes events out of order and it relies on this member to decide which change + * log events to keep and which ones to discard. + */ + Instant getEventTime(); + + String getPartitionKey(); + + /** + * Service specific Unique Id of this Item. + * + * @return String value indicating unique id of this item. + */ + String getId(); + + /** + * Key attributes related to this Item. + * + * @return A map of key attributes of this Item. + */ + Map getKeyAttributes(); + + /** + * Service specific Item's last modified time + * + * @return Instant when this item was created + */ + Instant getLastModifiedAt(); +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/util/CustomInstantDeserializer.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/util/CustomInstantDeserializer.java new file mode 100644 index 0000000000..ffc6b0f485 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/util/CustomInstantDeserializer.java @@ -0,0 +1,16 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.util; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; + +import java.io.IOException; +import java.time.Instant; + +public class CustomInstantDeserializer extends JsonDeserializer { + @Override + public Instant deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + long millis = p.getLongValue(); + return Instant.ofEpochMilli(millis); + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java new file mode 100644 index 0000000000..908eac5546 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java @@ -0,0 +1,162 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo; + +import java.lang.reflect.Field; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +@ExtendWith(MockitoExtension.class) +public class CrawlerTest { + @Mock + private SaasSourceConfig sourceConfig; + + @Mock + private EnhancedSourceCoordinator coordinator; + + @Mock + private Buffer> buffer; + + @Mock + private CrawlerClient client; + + @Mock + private SaasWorkerProgressState state; + + private Crawler crawler; + + @BeforeEach + public void setup() { + crawler = new Crawler(client); + } + + @Test + public void crawlerConstructionTest() { + assertNotNull(crawler); + } + + @Test + public void executePartitionTest() { + crawler.executePartition(state, buffer, sourceConfig); + verify(client).executePartition(state, buffer, sourceConfig); + } + + @Test + void testCrawlWithEmptyList() { + Instant lastPollTime = Instant.ofEpochMilli(0); + when(client.listItems()).thenReturn(Collections.emptyIterator()); + crawler.crawl(lastPollTime, coordinator); + verify(coordinator, never()).createPartition(any(SaasSourcePartition.class)); + } + + @Test + void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessException { + Instant lastPollTime = Instant.ofEpochMilli(0); + List itemInfoList = new ArrayList<>(); + int maxItemsPerPage = getMaxItemsPerPage(); + for (int i = 0; i < maxItemsPerPage; i++) { + itemInfoList.add(new TestItemInfo("itemId")); + } + when(client.listItems()).thenReturn(itemInfoList.iterator()); + crawler.crawl(lastPollTime, coordinator); + verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); + + } + + @Test + void testCrawlWithMultiplePartitions() throws NoSuchFieldException, IllegalAccessException { + Instant lastPollTime = Instant.ofEpochMilli(0); + List itemInfoList = new ArrayList<>(); + int maxItemsPerPage = getMaxItemsPerPage(); + for (int i = 0; i < maxItemsPerPage + 1; i++) { + itemInfoList.add(new TestItemInfo("testId")); + } + when(client.listItems()).thenReturn(itemInfoList.iterator()); + crawler.crawl(lastPollTime, coordinator); + verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class)); + + } + + @Test + void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException { + Instant lastPollTime = Instant.ofEpochMilli(0); + List itemInfoList = new ArrayList<>(); + int maxItemsPerPage = getMaxItemsPerPage(); + itemInfoList.add(null); + for (int i = 0; i < maxItemsPerPage - 1; i++) { + itemInfoList.add(new TestItemInfo("testId")); + } + when(client.listItems()).thenReturn(itemInfoList.iterator()); + crawler.crawl(lastPollTime, coordinator); + verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); + } + + @Test + void testUpdatingPollTimeNullMetaData() { + Instant lastPollTime = Instant.ofEpochMilli(0); + List itemInfoList = new ArrayList<>(); + ItemInfo testItem = createTestItemInfo("1"); + itemInfoList.add(testItem); + when(client.listItems()).thenReturn(itemInfoList.iterator()); + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime); + } + + @Test + void testUpdatedPollTimeNiCreatedLarger() { + Instant lastPollTime = Instant.ofEpochMilli(0); + List itemInfoList = new ArrayList<>(); + ItemInfo testItem = createTestItemInfo("1"); + itemInfoList.add(testItem); + when(client.listItems()).thenReturn(itemInfoList.iterator()); + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + assertEquals(Instant.ofEpochMilli(10), updatedPollTime); + } + + @Test + void testUpdatedPollTimeNiUpdatedLarger() { + Instant lastPollTime = Instant.ofEpochMilli(0); + List itemInfoList = new ArrayList<>(); + ItemInfo testItem = createTestItemInfo("1"); + itemInfoList.add(testItem); + when(client.listItems()).thenReturn(itemInfoList.iterator()); + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + assertEquals(Instant.ofEpochMilli(10), updatedPollTime); + } + + + private int getMaxItemsPerPage() throws NoSuchFieldException, IllegalAccessException { + Field maxItemsPerPageField = Crawler.class.getDeclaredField("maxItemsPerPage"); + maxItemsPerPageField.setAccessible(true); + return (int) maxItemsPerPageField.get(null); + } + + private ItemInfo createTestItemInfo(String id) { + return new TestItemInfo(id, new HashMap<>(), Instant.now()); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasPluginExecutorServiceProviderTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasPluginExecutorServiceProviderTest.java new file mode 100644 index 0000000000..d92dcdd2ba --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasPluginExecutorServiceProviderTest.java @@ -0,0 +1,69 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class SaasPluginExecutorServiceProviderTest { + + private SaasPluginExecutorServiceProvider provider; + private ExecutorService executorService; + + private SaasPluginExecutorServiceProvider provider2; + @Mock + private ExecutorService mockExecutorService; + + @BeforeEach + void setUp() { + provider = new SaasPluginExecutorServiceProvider(); + executorService = provider.get(); + } + + @AfterEach + void tearDown() { + provider.terminateExecutor(); + } + + @Test + void testConstruction() { + assertNotNull(executorService); + assertNotNull(provider); + } + + @Test + void testTerminateExecutor() { + provider.terminateExecutor(); + assertTrue(executorService.isShutdown()); + assertTrue(executorService.isTerminated()); + } + + @Test + void terminateExecutorInterruptionTest() throws InterruptedException { + provider2 = new SaasPluginExecutorServiceProvider(mockExecutorService); + when(mockExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException()); + AtomicBoolean wasInterrupted = new AtomicBoolean(false); + + Thread testThread = new Thread(() -> { + provider2.terminateExecutor(); + wasInterrupted.set(Thread.interrupted()); + }); + testThread.start(); + testThread.join(); + + assertTrue(wasInterrupted.get()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourcePluginTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourcePluginTest.java new file mode 100644 index 0000000000..d1ed74db81 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/SaasSourcePluginTest.java @@ -0,0 +1,138 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler; + +import java.util.concurrent.ExecutorService; +import java.util.function.Function; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +@ExtendWith(MockitoExtension.class) +public class SaasSourcePluginTest { + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private PluginFactory pluginFactory; + + @Mock + private Crawler crawler; + + @Mock + private SaasPluginExecutorServiceProvider executorServiceProvider; + + @Mock + private ExecutorService executorService; + + @Mock + private SaasSourceConfig sourceConfig; + + @Mock + private Buffer> buffer; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + + private testSaasSourcePlugin saasSourcePlugin; + + @BeforeEach + void setUp() { + when(executorServiceProvider.get()).thenReturn(executorService); + saasSourcePlugin = new testSaasSourcePlugin(pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); + } + + @Test + void pluginConstructorTest() { + assertNotNull(saasSourcePlugin); + verify(executorServiceProvider).get(); + } + + @Test + void testSetEnhancedSourceCoordinator() { + saasSourcePlugin.setEnhancedSourceCoordinator(sourceCoordinator); + verify(sourceCoordinator).initialize(); + } + + @Test + void areAcknowledgementsEnabledTest() { + assertFalse(saasSourcePlugin.areAcknowledgementsEnabled()); + } + + @Test + void startTest() { + saasSourcePlugin.setEnhancedSourceCoordinator(sourceCoordinator); + saasSourcePlugin.start(buffer); + assertFalse(executorService.isShutdown()); + assertFalse(executorService.isTerminated()); + } + + @Test + void testExecutorServiceSchedulersSubmitted() { + saasSourcePlugin.setEnhancedSourceCoordinator(sourceCoordinator); + saasSourcePlugin.start(buffer); + verify(executorService, times(1)).submit(any(LeaderScheduler.class)); + verify(executorService, times(SaasSourceConfig.DEFAULT_NUMBER_OF_WORKERS)) + .submit(any(Thread.class)); + } + + @Test + void testStop() { + saasSourcePlugin.stop(); + verify(executorService).shutdownNow(); + } + + @Test + void testGetPartitionFactory() { + Function factory = saasSourcePlugin.getPartitionFactory(); + assertNotNull(factory); + + } + + @Test + void testGetDecoder() { + ByteDecoder decoder = saasSourcePlugin.getDecoder(); + assertNull(decoder); + + } + + @Nested + public class testSaasSourcePlugin extends SaasSourcePlugin { + public testSaasSourcePlugin(final PluginMetrics pluginMetrics, + final SaasSourceConfig sourceConfig, + final PluginFactory pluginFactory, + final AcknowledgementSetManager acknowledgementSetManager, + final Crawler crawler, + final SaasPluginExecutorServiceProvider executorServiceProvider) { + super("TestcasePlugin", pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); + } + } + + +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/PartitionFactoryTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/PartitionFactoryTest.java new file mode 100644 index 0000000000..af60c1e98b --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/PartitionFactoryTest.java @@ -0,0 +1,104 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination; + + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState; + +import java.time.Instant; +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class PartitionFactoryTest { + + private final String sourceIdentifier = UUID.randomUUID().toString(); + @Mock + private SourcePartitionStoreItem sourcePartitionStoreItem; + + @Test + void testCreateLeaderPartition() { + String sourceId = sourceIdentifier + "|" + LeaderPartition.PARTITION_TYPE; + when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn(sourceId); + + String state = "{\"last_poll_time\":1729391235717}"; + when(sourcePartitionStoreItem.getPartitionProgressState()).thenReturn(state); + + + PartitionFactory factory = new PartitionFactory(); + EnhancedSourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); + assertThat(sourcePartition, notNullValue()); + LeaderPartition leaderParition = (LeaderPartition) sourcePartition; + assertThat(leaderParition.getPartitionType(), equalTo(LeaderPartition.PARTITION_TYPE)); + assertThat(leaderParition.getPartitionKey(), equalTo(LeaderPartition.DEFAULT_PARTITION_KEY)); + + Optional progressState = leaderParition.getProgressState(); + assertThat(progressState.isPresent(), equalTo(true)); + assertThat(progressState.get().getLastPollTime(), equalTo(Instant.ofEpochMilli(1729391235717L))); + + //Update leader progress state and then verify + LeaderProgressState updatedState = new LeaderProgressState(Instant.ofEpochMilli(12345L)); + leaderParition.setLeaderProgressState(updatedState); + assertThat(progressState.get().getLastPollTime(), equalTo(Instant.ofEpochMilli(12345L))); + } + + @Test + void testCreatWorkerPartition() { + final String saasProject = "project-1"; + final String projectCategory = "category-1"; + String sourceId = sourceIdentifier + "|" + SaasSourcePartition.PARTITION_TYPE; + String partitionKey = saasProject + "|" + projectCategory + UUID.randomUUID(); + when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn(sourceId); + when(sourcePartitionStoreItem.getSourcePartitionKey()).thenReturn(partitionKey); + + String state = "{\"keyAttributes\":{\"project\":\"project-1\"},\"totalItems\":0,\"loadedItems\":20,\"exportStartTime\":1729391235717,\"itemIds\":[\"GTMS-25\",\"GTMS-24\"]}"; + when(sourcePartitionStoreItem.getPartitionProgressState()).thenReturn(state); + + PartitionFactory factory = new PartitionFactory(); + EnhancedSourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); + assertThat(sourcePartition, notNullValue()); + SaasSourcePartition saasSourcePartition = (SaasSourcePartition) sourcePartition; + assertThat(saasSourcePartition.getPartitionType(), equalTo(SaasSourcePartition.PARTITION_TYPE)); + assertThat(saasSourcePartition.getPartitionKey(), equalTo(partitionKey)); + assertThat(saasSourcePartition.getProgressState().isPresent(), equalTo(true)); + } + + @Test + void testCreatWorkerPartitionWithNullState() { + + String sourceId = sourceIdentifier + "|" + SaasSourcePartition.PARTITION_TYPE; + when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn(sourceId); + + + PartitionFactory factory = new PartitionFactory(); + EnhancedSourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); + assertThat(sourcePartition, notNullValue()); + SaasSourcePartition saasSourcePartition = (SaasSourcePartition) sourcePartition; + assertThat(saasSourcePartition.getPartitionType(), equalTo(SaasSourcePartition.PARTITION_TYPE)); + assertThat(saasSourcePartition.getProgressState().isPresent(), equalTo(false)); + } + + @Test + void testUnknownPartition() { + + String sourceId = sourceIdentifier + "|" + "UNKNOWN"; + when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn(sourceId); + + PartitionFactory factory = new PartitionFactory(); + assertThrows(RuntimeException.class, () -> factory.apply(sourcePartitionStoreItem)); + } + + +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java new file mode 100644 index 0000000000..48fa9dac16 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java @@ -0,0 +1,115 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasSourcePlugin; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class LeaderSchedulerTest { + + @Mock + private EnhancedSourceCoordinator coordinator; + @Mock + private SaasSourcePlugin saasSourcePlugin; + @Mock + private Crawler crawler; + + @Test + void testUnableToAcquireLeaderPartition() throws InterruptedException { + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.empty()); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + Thread.sleep(100); + executorService.shutdownNow(); + verifyNoInteractions(crawler); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testLeaderPartitionsCreation(boolean initializationState) throws InterruptedException { + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderPartition leaderPartition = new LeaderPartition(); + leaderPartition.getProgressState().get().setInitialized(initializationState); + leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); + given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); + doThrow(RuntimeException.class).when(coordinator).saveProgressStateForPartition(any(LeaderPartition.class), any(Duration.class)); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + + Thread.sleep(100); + executorService.shutdownNow(); + + // Check if crawler was invoked and updated leader lease renewal time + verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator); + verify(coordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) throws InterruptedException { + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderPartition leaderPartition = new LeaderPartition(); + leaderPartition.getProgressState().get().setInitialized(initializationState); + leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); + given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willThrow(RuntimeException.class); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + + Thread.sleep(100); + executorService.shutdownNow(); + + // Check if crawler was invoked and updated leader lease renewal time + verifyNoInteractions(crawler); + } + + @Test + void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + leaderScheduler.setLeaseInterval(Duration.ofMillis(10)); + LeaderPartition leaderPartition = new LeaderPartition(); + leaderPartition.getProgressState().get().setInitialized(false); + leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); + when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class))).thenReturn(Instant.ofEpochMilli(10)); + when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)) + .thenReturn(Optional.of(leaderPartition)) + .thenThrow(RuntimeException.class); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + + //Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute + Thread.sleep(100); + executorService.shutdownNow(); + + // Check if crawler was invoked and updated leader lease renewal time + verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class)); + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java new file mode 100644 index 0000000000..00d77ddecb --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java @@ -0,0 +1,151 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasSourceConfig; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.PartitionFactory; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class WorkerSchedulerTest { + + @Mock + Buffer> buffer; + @Mock + private EnhancedSourceCoordinator coordinator; + @Mock + private SaasSourceConfig sourceConfig; + @Mock + private Crawler crawler; + + @Mock + private SourcePartitionStoreItem sourcePartitionStoreItem; + + + @Test + void testUnableToAcquireLeaderPartition() throws InterruptedException { + WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler); + given(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)).willReturn(Optional.empty()); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(workerScheduler); + Thread.sleep(100); + executorService.shutdownNow(); + verifyNoInteractions(crawler); + } + + @Test + void testLeaderPartitionsCreation() throws InterruptedException { + WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler); + + String sourceId = UUID.randomUUID() + "|" + SaasSourcePartition.PARTITION_TYPE; + String state = "{\"keyAttributes\":{\"project\":\"project-1\"},\"totalItems\":0,\"loadedItems\":20,\"exportStartTime\":1729391235717,\"itemIds\":[\"GTMS-25\",\"GTMS-24\"]}"; + when(sourcePartitionStoreItem.getPartitionProgressState()).thenReturn(state); + when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn(sourceId); + PartitionFactory factory = new PartitionFactory(); + EnhancedSourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); + given(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)).willReturn(Optional.of(sourcePartition)); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(workerScheduler); + + Thread.sleep(50); + executorService.shutdownNow(); + + // Check if crawler was invoked and updated leader lease renewal time + SaasWorkerProgressState stateObj = (SaasWorkerProgressState) sourcePartition.getProgressState().get(); + verify(crawler, atLeast(1)).executePartition(stateObj, buffer, sourceConfig); + verify(coordinator, atLeast(1)).completePartition(eq(sourcePartition)); + } + + @Test + void testEmptyProgressState() throws InterruptedException { + WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler); + + String sourceId = UUID.randomUUID() + "|" + SaasSourcePartition.PARTITION_TYPE; + when(sourcePartitionStoreItem.getPartitionProgressState()).thenReturn(null); + when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn(sourceId); + PartitionFactory factory = new PartitionFactory(); + EnhancedSourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); + given(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)).willReturn(Optional.of(sourcePartition)); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(workerScheduler); + + Thread.sleep(50); + executorService.shutdownNow(); + + // Check if crawler was invoked and updated leader lease renewal time + verifyNoInteractions(crawler); + verify(coordinator, atLeast(1)).completePartition(eq(sourcePartition)); + } + + @Test + void testExceptionWhileAcquiringWorkerPartition() throws InterruptedException { + WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler); + given(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)).willThrow(RuntimeException.class); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(workerScheduler); + + Thread.sleep(10); + executorService.shutdownNow(); + + // Crawler shouldn't be invoked in this case + verifyNoInteractions(crawler); + } + + @Test + void testWhenNoPartitionToWorkOn() throws InterruptedException { + WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler); + given(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)).willReturn(Optional.empty()); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(workerScheduler); + + //Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute + Thread.sleep(10); + executorService.shutdownNow(); + + // Crawler shouldn't be invoked in this case + verifyNoInteractions(crawler); + } + + @Test + void testRetryBackOffTriggeredWhenExceptionOccurred() throws InterruptedException { + WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler); + given(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)).willThrow(RuntimeException.class); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(workerScheduler); + + //Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute + Thread.sleep(10); + executorService.shutdownNow(); + + // Crawler shouldn't be invoked in this case + verifyNoInteractions(crawler); + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/LeaderProgressStateTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/LeaderProgressStateTest.java new file mode 100644 index 0000000000..ed6b10ef78 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/LeaderProgressStateTest.java @@ -0,0 +1,34 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +import java.time.Instant; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class LeaderProgressStateTest { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + void testDefaultValues() throws JsonProcessingException { + String state = "{}"; + LeaderProgressState leaderProgressState = objectMapper.readValue(state, LeaderProgressState.class); + assertNull(leaderProgressState.getLastPollTime()); + assertFalse(leaderProgressState.isInitialized()); + } + + @Test + void testInitializedValues() throws JsonProcessingException { + String state = "{\"last_poll_time\":1729391235717, \"initialized\": true}"; + LeaderProgressState leaderProgressState = objectMapper.readValue(state, LeaderProgressState.class); + assertEquals(Instant.ofEpochMilli(1729391235717L), leaderProgressState.getLastPollTime()); + assertTrue(leaderProgressState.isInitialized()); + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/SaasWorkerProgressStateTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/SaasWorkerProgressStateTest.java new file mode 100644 index 0000000000..c7a4f48a43 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/SaasWorkerProgressStateTest.java @@ -0,0 +1,44 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.Test; + +import java.time.Instant; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SaasWorkerProgressStateTest { + + private static final ObjectMapper objectMapper = new ObjectMapper(new JsonFactory()) + .registerModule(new JavaTimeModule()); + + @Test + void testDefaultValues() throws JsonProcessingException { + String state = "{}"; + SaasWorkerProgressState workderProgressState = objectMapper.readValue(state, SaasWorkerProgressState.class); + assertEquals(0, workderProgressState.getTotalItems()); + assertEquals(0, workderProgressState.getLoadedItems()); + assertNotNull(workderProgressState.getKeyAttributes()); + assertTrue(workderProgressState.getKeyAttributes().isEmpty()); + assertNull(workderProgressState.getExportStartTime()); + assertNull(workderProgressState.getItemIds()); + } + + @Test + void testInitializedValues() throws JsonProcessingException { + String state = "{\"keyAttributes\":{\"project\":\"project-1\"},\"totalItems\":10,\"loadedItems\":20,\"exportStartTime\":1729391235717,\"itemIds\":[\"GTMS-25\",\"GTMS-24\"]}"; + SaasWorkerProgressState workderProgressState = objectMapper.readValue(state, SaasWorkerProgressState.class); + assertEquals(10, workderProgressState.getTotalItems()); + assertEquals(20, workderProgressState.getLoadedItems()); + assertNotNull(workderProgressState.getKeyAttributes()); + assertEquals(workderProgressState.getExportStartTime(), Instant.ofEpochMilli(1729391235717L)); + assertNotNull(workderProgressState.getItemIds()); + assertEquals(2, workderProgressState.getItemIds().size()); + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/ItemInfoTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/ItemInfoTest.java new file mode 100644 index 0000000000..ddf5c6123e --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/ItemInfoTest.java @@ -0,0 +1,41 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.model; + +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Map; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ItemInfoTest { + + @Test + void testItemInfoSimpleConstructor() { + String itemId = UUID.randomUUID().toString(); + TestItemInfo itemInfo = new TestItemInfo(itemId); + assertEquals(itemId, itemInfo.getItemId()); + assertNull(itemInfo.getMetadata()); + assertEquals("partitionKey", itemInfo.getPartitionKey()); + assertEquals("id", itemInfo.getId()); + assertTrue(itemInfo.getKeyAttributes().isEmpty()); + } + + @Test + void testItemInfo() { + String itemId = UUID.randomUUID().toString(); + TestItemInfo itemInfo = new TestItemInfo(itemId, Map.of("k1", "v1"), Instant.ofEpochMilli(1L)); + + assertEquals(itemId, itemInfo.getItemId()); + assertFalse(itemInfo.getMetadata().isEmpty()); + assertEquals("v1", itemInfo.getMetadata().get("k1")); + assertEquals(Instant.ofEpochMilli(1L), itemInfo.getEventTime()); + assertEquals("partitionKey", itemInfo.getPartitionKey()); + assertEquals("id", itemInfo.getId()); + assertTrue(itemInfo.getKeyAttributes().isEmpty()); + + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/TestItemInfo.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/TestItemInfo.java new file mode 100644 index 0000000000..6e1a195da3 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/model/TestItemInfo.java @@ -0,0 +1,56 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.model; + +import java.time.Instant; +import java.util.Map; + +public class TestItemInfo implements ItemInfo { + + String itemId; + Map metadata; + Instant eventTime; + + public TestItemInfo(String itemId, Map metadata, Instant eventTime) { + this.itemId = itemId; + this.metadata = metadata; + this.eventTime = eventTime; + } + + public TestItemInfo(String itemId) { + this.itemId = itemId; + } + + @Override + public String getItemId() { + return itemId; + } + + @Override + public Map getMetadata() { + return this.metadata; + } + + @Override + public Instant getEventTime() { + return this.eventTime; + } + + @Override + public String getPartitionKey() { + return "partitionKey"; + } + + @Override + public String getId() { + return "id"; + } + + @Override + public Map getKeyAttributes() { + return Map.of(); + } + + @Override + public Instant getLastModifiedAt() { + return Instant.ofEpochMilli(10); + } +} diff --git a/settings.gradle b/settings.gradle index 0acecb8122..f8fc52a4d3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -186,3 +186,7 @@ include 'data-prepper-plugins:aws-lambda' include 'data-prepper-plugin-schema' include 'data-prepper-plugins:kinesis-source' include 'data-prepper-plugins:opensearch-api-source' +include 'data-prepper-plugins:saas-source-plugins' +include 'data-prepper-plugins:saas-source-plugins:source-crawler' +include 'data-prepper-plugins:saas-source-plugins:jira-source' +