Skip to content

Commit

Permalink
introduced boolean to control end to end Acknowledgment
Browse files Browse the repository at this point in the history
Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 committed Jan 22, 2025
1 parent 784106e commit f825a27
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,13 @@ public void executePartition(SaasWorkerProgressState state,
.collect(Collectors.toList());

try {
recordsToWrite.forEach(eventRecord -> acknowledgementSet.add(eventRecord.getData()));
buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
acknowledgementSet.complete();
if (configuration.isAcknowledgments()) {
recordsToWrite.forEach(eventRecord -> acknowledgementSet.add(eventRecord.getData()));
buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
acknowledgementSet.complete();
} else {
buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ public class JiraSourceConfig implements CrawlerSourceConfig {
@JsonProperty("backoff_time")
private Duration backOff = DEFAULT_BACKOFF_MILLIS;

/**
* Boolean property indicating end to end acknowledgments state
*/
@JsonProperty("acknowledgments")
@Getter
private boolean acknowledgments = false;

public String getAccountUrl() {
return this.getHosts().get(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void renewCredentials() {
.getAccessToken().getValue();
this.refreshToken = (String) jiraSourceConfig.getAuthenticationConfig()
.getOauth2Config().getRefreshToken().getValue();
this.expireTime = Instant.ofEpochMilli(System.currentTimeMillis() + (expiresInSeconds * 100L));
}
throw new RuntimeException("Failed to renew access token message:" + ex.getMessage(), ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,11 @@
public interface CrawlerSourceConfig {

int DEFAULT_NUMBER_OF_WORKERS = 1;

/**
* Boolean to indicate if acknowledgments enabled for this source
*
* @return boolean indicating acknowledgement state
*/
boolean isAcknowledgments();
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ private void processPartition(EnhancedSourcePartition partition, Buffer<Record<E
// Update the partition state or commit the partition as needed
// Commit the partition to mark it as processed
if (partition.getProgressState().isPresent()) {
AcknowledgementSet acknowledgementSet = createAcknowledgementSet(partition);
AcknowledgementSet acknowledgementSet = null;
if (sourceConfig.isAcknowledgments()) {
acknowledgementSet = createAcknowledgementSet(partition);
}
crawler.executePartition((SaasWorkerProgressState) partition.getProgressState().get(), buffer, acknowledgementSet);
}
sourceCoordinator.completePartition(partition);
Expand Down

0 comments on commit f825a27

Please sign in to comment.