Skip to content

Commit

Permalink
Handling end to end acknowledgement
Browse files Browse the repository at this point in the history
Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 committed Jan 17, 2025
1 parent 83e949d commit bebc885
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
Expand Down Expand Up @@ -87,7 +88,7 @@ void injectObjectMapper(ObjectMapper objectMapper) {
@Override
public void executePartition(SaasWorkerProgressState state,
Buffer<Record<Event>> buffer,
CrawlerSourceConfig configuration) {
AcknowledgementSet acknowledgementSet) {
log.trace("Executing the partition: {} with {} ticket(s)",
state.getKeyAttributes(), state.getItemIds().size());
List<String> itemIds = state.getItemIds();
Expand Down Expand Up @@ -130,7 +131,9 @@ public void executePartition(SaasWorkerProgressState state,
.collect(Collectors.toList());

try {
recordsToWrite.forEach(eventRecord -> acknowledgementSet.add(eventRecord.getData()));
buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
acknowledgementSet.complete();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;

Expand All @@ -46,26 +46,21 @@
@ExtendWith(MockitoExtension.class)
public class JiraClientTest {

private final PluginExecutorServiceProvider executorServiceProvider = new PluginExecutorServiceProvider();
@Mock
private Buffer<Record<Event>> buffer;

@Mock
private SaasWorkerProgressState saasWorkerProgressState;

@Mock
private CrawlerSourceConfig crawlerSourceConfig;

private AcknowledgementSet acknowledgementSet;
@Mock
private JiraSourceConfig jiraSourceConfig;

@Mock
private JiraService jiraService;

@Mock
private JiraIterator jiraIterator;

private PluginExecutorServiceProvider executorServiceProvider = new PluginExecutorServiceProvider();

@Test
void testConstructor() {
JiraClient jiraClient = new JiraClient(jiraService, jiraIterator, executorServiceProvider, jiraSourceConfig);
Expand Down Expand Up @@ -98,7 +93,7 @@ void testExecutePartition() throws Exception {

ArgumentCaptor<Collection<Record<Event>>> recordsCaptor = ArgumentCaptor.forClass((Class) Collection.class);

jiraClient.executePartition(saasWorkerProgressState, buffer, crawlerSourceConfig);
jiraClient.executePartition(saasWorkerProgressState, buffer, acknowledgementSet);

verify(buffer).writeAll(recordsCaptor.capture(), anyInt());
Collection<Record<Event>> capturedRecords = recordsCaptor.getValue();
Expand All @@ -121,14 +116,13 @@ void testExecutePartitionError() throws Exception {

when(jiraService.getIssue(anyString())).thenReturn("{\"id\":\"ID1\",\"key\":\"TEST-1\"}");

ArgumentCaptor<Collection<Record<Event>>> recordsCaptor = ArgumentCaptor.forClass((Class) Collection.class);

ObjectMapper mockObjectMapper = mock(ObjectMapper.class);
when(mockObjectMapper.readValue(any(String.class), any(TypeReference.class))).thenThrow(new JsonProcessingException("test") {
});
jiraClient.injectObjectMapper(mockObjectMapper);

assertThrows(RuntimeException.class, () -> jiraClient.executePartition(saasWorkerProgressState, buffer, crawlerSourceConfig));
assertThrows(RuntimeException.class, () -> jiraClient.executePartition(saasWorkerProgressState, buffer, acknowledgementSet));
}

@Test
Expand All @@ -147,6 +141,6 @@ void bufferWriteRuntimeTest() throws Exception {
ArgumentCaptor<Collection<Record<Event>>> recordsCaptor = ArgumentCaptor.forClass((Class) Collection.class);

doThrow(new RuntimeException()).when(buffer).writeAll(recordsCaptor.capture(), anyInt());
assertThrows(RuntimeException.class, () -> jiraClient.executePartition(saasWorkerProgressState, buffer, crawlerSourceConfig));
assertThrows(RuntimeException.class, () -> jiraClient.executePartition(saasWorkerProgressState, buffer, acknowledgementSet));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -61,8 +62,8 @@ public Instant crawl(Instant lastPollTime,
return Instant.ofEpochMilli(startTime);
}

public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, CrawlerSourceConfig sourceConfig) {
client.executePartition(state, buffer, sourceConfig);
public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
client.executePartition(state, buffer, acknowledgementSet);
}

private void createPartition(List<ItemInfo> itemInfoList, EnhancedSourceCoordinator coordinator) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -36,9 +37,9 @@ public interface CrawlerClient {
/**
* Method for executing a particular partition or a chunk of work
*
* @param state worker node state holds the details of this particular chunk of work
* @param buffer pipeline buffer to write the results into
* @param sourceConfig pipeline configuration from the yaml
* @param state worker node state holds the details of this particular chunk of work
* @param buffer pipeline buffer to write the results into
* @param acknowledgementSet acknowledgement set to be used to track the completion of the partition
*/
void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, CrawlerSourceConfig sourceConfig);
void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet);
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void start(Buffer<Record<Event>> buffer) {
this.executorService.submit(leaderScheduler);
//Register worker threaders
for (int i = 0; i < sourceConfig.DEFAULT_NUMBER_OF_WORKERS; i++) {
WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler);
WorkerScheduler workerScheduler = new WorkerScheduler(sourcePluginName, buffer, coordinator,
sourceConfig, crawler, pluginMetrics, acknowledgementSetManager);
this.executorService.submit(new Thread(workerScheduler));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -21,24 +25,41 @@
*/
public class WorkerScheduler implements Runnable {

public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";
private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofSeconds(20);
private static final Logger log = LoggerFactory.getLogger(WorkerScheduler.class);
private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000;
private static final Duration DEFAULT_SLEEP_DURATION_MILLIS = Duration.ofMillis(10000);

private final EnhancedSourceCoordinator sourceCoordinator;
private final CrawlerSourceConfig sourceConfig;
private final Crawler crawler;
private final Buffer<Record<Event>> buffer;
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
private final Counter acknowledgementSetSuccesses;
private final Counter acknowledgementSetFailures;
private final String sourcePluginName;
private final String SOURCE_PLUGIN_NAME = "sourcePluginName";


public WorkerScheduler(Buffer<Record<Event>> buffer,
public WorkerScheduler(final String sourcePluginName,
Buffer<Record<Event>> buffer,
EnhancedSourceCoordinator sourceCoordinator,
CrawlerSourceConfig sourceConfig,
Crawler crawler) {
Crawler crawler,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
this.sourceCoordinator = sourceCoordinator;
this.sourceConfig = sourceConfig;
this.crawler = crawler;
this.buffer = buffer;
this.sourcePluginName = sourcePluginName;

this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetSuccesses = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME, SOURCE_PLUGIN_NAME, sourcePluginName);
this.acknowledgementSetFailures = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME, SOURCE_PLUGIN_NAME, sourcePluginName);
}

@Override
Expand All @@ -52,7 +73,7 @@ public void run() {
sourceCoordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE);
if (partition.isPresent()) {
// Process the partition (source extraction logic)
processPartition(partition.get(), buffer, sourceConfig);
processPartition(partition.get(), buffer);

} else {
log.debug("No partition available. This thread will sleep for {}", DEFAULT_SLEEP_DURATION_MILLIS);
Expand All @@ -75,13 +96,28 @@ public void run() {
log.warn("SourceItemWorker Scheduler is interrupted, looks like shutdown has triggered");
}

private void processPartition(EnhancedSourcePartition partition, Buffer<Record<Event>> buffer, CrawlerSourceConfig sourceConfig) {
private void processPartition(EnhancedSourcePartition partition, Buffer<Record<Event>> buffer) {
// Implement your source extraction logic here
// Update the partition state or commit the partition as needed
// Commit the partition to mark it as processed
if (partition.getProgressState().isPresent()) {
crawler.executePartition((SaasWorkerProgressState) partition.getProgressState().get(), buffer, sourceConfig);
AcknowledgementSet acknowledgementSet = createAcknowledgementSet(partition);
crawler.executePartition((SaasWorkerProgressState) partition.getProgressState().get(), buffer, acknowledgementSet);
}
sourceCoordinator.completePartition(partition);
}

private AcknowledgementSet createAcknowledgementSet(EnhancedSourcePartition partition) {
return acknowledgementSetManager.create((result) -> {
if (result) {
acknowledgementSetSuccesses.increment();
sourceCoordinator.completePartition(partition);
log.debug("acknowledgements received for partitionKey: {}", partition.getPartitionKey());
} else {
acknowledgementSetFailures.increment();
log.debug("acknowledgements received with false for partitionKey: {}", partition.getPartitionKey());
sourceCoordinator.giveUpPartition(partition);
}
}, ACKNOWLEDGEMENT_SET_TIMEOUT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -32,7 +33,7 @@
@ExtendWith(MockitoExtension.class)
public class CrawlerTest {
@Mock
private CrawlerSourceConfig sourceConfig;
private AcknowledgementSet acknowledgementSet;

@Mock
private EnhancedSourceCoordinator coordinator;
Expand Down Expand Up @@ -60,8 +61,8 @@ public void crawlerConstructionTest() {

@Test
public void executePartitionTest() {
crawler.executePartition(state, buffer, sourceConfig);
verify(client).executePartition(state, buffer, sourceConfig);
crawler.executePartition(state, buffer, acknowledgementSet);
verify(client).executePartition(state, buffer, acknowledgementSet);
}

@Test
Expand Down
Loading

0 comments on commit bebc885

Please sign in to comment.