From f825a27381a31a1ec57276ae2c5cca84c2f6e9e2 Mon Sep 17 00:00:00 2001 From: Santhosh Gandhe <1909520+san81@users.noreply.github.com> Date: Tue, 21 Jan 2025 17:10:04 -0800 Subject: [PATCH] introduced boolean to control end to end Acknowledgment Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> --- .../dataprepper/plugins/source/jira/JiraClient.java | 10 +++++++--- .../plugins/source/jira/JiraSourceConfig.java | 7 +++++++ .../plugins/source/jira/rest/auth/JiraOauthConfig.java | 1 + .../source_crawler/base/CrawlerSourceConfig.java | 7 +++++++ .../coordination/scheduler/WorkerScheduler.java | 5 ++++- 5 files changed, 26 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraClient.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraClient.java index f9bb5cad17..0785741248 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraClient.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraClient.java @@ -131,9 +131,13 @@ public void executePartition(SaasWorkerProgressState state, .collect(Collectors.toList()); try { - recordsToWrite.forEach(eventRecord -> acknowledgementSet.add(eventRecord.getData())); - buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis()); - acknowledgementSet.complete(); + if (configuration.isAcknowledgments()) { + recordsToWrite.forEach(eventRecord -> acknowledgementSet.add(eventRecord.getData())); + buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis()); + acknowledgementSet.complete(); + } else { + buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis()); + } } catch (Exception e) { throw new RuntimeException(e); } diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java index df5cd70f0b..f58f37f9dd 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java @@ -59,6 +59,13 @@ public class JiraSourceConfig implements CrawlerSourceConfig { @JsonProperty("backoff_time") private Duration backOff = DEFAULT_BACKOFF_MILLIS; + /** + * Boolean property indicating end to end acknowledgments state + */ + @JsonProperty("acknowledgments") + @Getter + private boolean acknowledgments = false; + public String getAccountUrl() { return this.getHosts().get(0); } diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java index 0bf6614e6a..02d63bb88c 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java @@ -154,6 +154,7 @@ public void renewCredentials() { .getAccessToken().getValue(); this.refreshToken = (String) jiraSourceConfig.getAuthenticationConfig() .getOauth2Config().getRefreshToken().getValue(); + this.expireTime = Instant.ofEpochMilli(System.currentTimeMillis() + (expiresInSeconds * 100L)); } throw new RuntimeException("Failed to renew access token message:" + ex.getMessage(), ex); } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java index 18649e052c..77902bdbc7 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java @@ -6,4 +6,11 @@ public interface CrawlerSourceConfig { int DEFAULT_NUMBER_OF_WORKERS = 1; + + /** + * Boolean to indicate if acknowledgments enabled for this source + * + * @return boolean indicating acknowledgement state + */ + boolean isAcknowledgments(); } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java index 2364025a7d..f2fc7e4b40 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java @@ -101,7 +101,10 @@ private void processPartition(EnhancedSourcePartition partition, Buffer