Skip to content

Commit

Permalink
Neptune Zero-ETL CDC Initial Implementation (#5308)
Browse files Browse the repository at this point in the history
Initial set up for Neptune Zero-ETL support

Signed-off-by: Amanda Xiang <[email protected]>

---------

Signed-off-by: Amanda Xiang <[email protected]>
Co-authored-by: Mohamed Elzarei <[email protected]>
Co-authored-by: Halvard Morstad <[email protected]>
Co-authored-by: Mohamed Elzarei <[email protected]>
Co-authored-by: Amanda Xiang <[email protected]>
  • Loading branch information
5 people authored Jan 7, 2025
1 parent 708843c commit 0fe3f3a
Show file tree
Hide file tree
Showing 55 changed files with 4,324 additions and 1 deletion.
32 changes: 32 additions & 0 deletions data-prepper-plugins/neptune-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
dependencies {
implementation project(path: ':data-prepper-api')

implementation libs.commons.lang3
implementation 'io.micrometer:micrometer-core'
implementation 'org.apache.httpcomponents:httpclient:4.5.x'
implementation 'com.amazonaws:amazon-neptune-sigv4-signer:3.0.1'
implementation 'org.eclipse.rdf4j:rdf4j-rio-nquads:5.0.2'
implementation 'software.amazon.awssdk:neptunedata'
implementation 'software.amazon.awssdk:sts'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
compileOnly 'org.projectlombok:lombok:1.18.22'
annotationProcessor 'org.projectlombok:lombok:1.18.22'

implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'software.amazon.awssdk:s3'

implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:http-common')
implementation project(path: ':data-prepper-plugins:common')

testImplementation testLibs.bundles.junit
testImplementation testLibs.slf4j.simple
testImplementation project(path: ':data-prepper-test-common')

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.neptune;

import com.amazonaws.neptune.auth.NeptuneSigV4SignerException;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.neptune.configuration.NeptuneSourceConfig;
import org.opensearch.dataprepper.plugins.source.neptune.leader.LeaderScheduler;
import org.opensearch.dataprepper.plugins.source.neptune.s3partition.S3PartitionCreatorScheduler;
import org.opensearch.dataprepper.plugins.source.neptune.stream.StreamScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* This service class
* - reading stream data from Neptune Stream Http Endpoints
* - export (initial load) - to be implemented later
*/
public class NeptuneService {
private static final Logger LOG = LoggerFactory.getLogger(NeptuneService.class);

private static final String S3_PATH_DELIMITER = "/";

private final EnhancedSourceCoordinator sourceCoordinator;
private final PluginMetrics pluginMetrics;
private final NeptuneSourceConfig sourceConfig;
private final AcknowledgementSetManager acknowledgementSetManager;

private ExecutorService executor;

public NeptuneService(final EnhancedSourceCoordinator sourceCoordinator,
final NeptuneSourceConfig sourceConfig,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) throws NeptuneSigV4SignerException {
this.sourceCoordinator = sourceCoordinator;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceConfig = sourceConfig;
}

/**
* This service start 3 long-running threads (scheduler)
* - Leader Scheduler
* - Stream Scheduler
* -
* Each thread is responsible for one type of job.
* The data will be guaranteed to be sent to {@link Buffer} in order.
*
* @param buffer Data Prepper Buffer
*/
public void start(Buffer<Record<Event>> buffer) {
LOG.info("Start running Neptune service");

final String s3PathPrefix = getS3PathPrefix();
final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig, s3PathPrefix);
final S3PartitionCreatorScheduler s3PartitionCreatorScheduler = new S3PartitionCreatorScheduler(sourceCoordinator);
final StreamScheduler streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, s3PathPrefix, pluginMetrics);

executor = Executors.newFixedThreadPool(3, BackgroundThreadFactory.defaultExecutorThreadFactory("neptune-source"));
executor.submit(leaderScheduler);
executor.submit(s3PartitionCreatorScheduler);
executor.submit(streamScheduler);
}

private String getS3PathPrefix() {
final String s3UserPathPrefix;
if (sourceConfig.getS3Prefix() != null && !sourceConfig.getS3Prefix().isBlank()) {
s3UserPathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER;
} else {
s3UserPathPrefix = "";
}

final String s3PathPrefix;
final Instant now = Instant.now();
if (sourceCoordinator.getPartitionPrefix() != null) {
s3PathPrefix = s3UserPathPrefix + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER + now.toEpochMilli() + S3_PATH_DELIMITER;
} else {
s3PathPrefix = s3UserPathPrefix + now.toEpochMilli() + S3_PATH_DELIMITER;
}
return s3PathPrefix;
}

/**
* Interrupt the running of schedulers.
* Each scheduler must implement logic for gracefully shutdown.
*/
public void shutdown() {
if (executor != null) {
LOG.info("shutdown Neptune Service scheduler and worker");
executor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.neptune;

import com.amazonaws.neptune.auth.NeptuneSigV4SignerException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination;
import org.opensearch.dataprepper.plugins.source.neptune.configuration.NeptuneSourceConfig;
import org.opensearch.dataprepper.plugins.source.neptune.coordination.PartitionFactory;
import org.opensearch.dataprepper.plugins.source.neptune.coordination.partition.LeaderPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.function.Function;


@DataPrepperPlugin(name = "neptune", pluginType = Source.class, pluginConfigurationType = NeptuneSourceConfig.class)
public class NeptuneSource implements Source<Record<Event>>, UsesEnhancedSourceCoordination {
private static final Logger LOG = LoggerFactory.getLogger(NeptuneSource.class);

private final PluginMetrics pluginMetrics;
private final NeptuneSourceConfig sourceConfig;
private final PluginConfigObservable pluginConfigObservable;
private EnhancedSourceCoordinator sourceCoordinator;
private final AcknowledgementSetManager acknowledgementSetManager;
private NeptuneService neptuneService;

private final boolean acknowledgementsEnabled;

@DataPrepperPluginConstructor
public NeptuneSource(final PluginMetrics pluginMetrics,
final NeptuneSourceConfig sourceConfig,
final AcknowledgementSetManager acknowledgementSetManager,
final PluginConfigObservable pluginConfigObservable) {
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginConfigObservable = pluginConfigObservable;
this.acknowledgementsEnabled = sourceConfig.isAcknowledgments();
}

@Override
public void start(final Buffer<Record<Event>> buffer) {
Objects.requireNonNull(sourceCoordinator);
sourceCoordinator.createPartition(new LeaderPartition());

try {
neptuneService = new NeptuneService(sourceCoordinator, sourceConfig, pluginMetrics, acknowledgementSetManager);
LOG.info("Start Neptune service");
neptuneService.start(buffer);
} catch (NeptuneSigV4SignerException e) {
LOG.error("SigV4 ERROR");
}
}


@Override
public void stop() {
LOG.info("Stop Neptune service");
if (Objects.nonNull(neptuneService)) {
neptuneService.shutdown();
}
}

@Override
public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordinator) {
this.sourceCoordinator = sourceCoordinator;
this.sourceCoordinator.initialize();
}

@Override
public Function<SourcePartitionStoreItem, EnhancedSourcePartition> getPartitionFactory() {
return new PartitionFactory();
}

@Override
public boolean areAcknowledgementsEnabled() {
return acknowledgementsEnabled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.neptune.buffer;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* Record Buffer writer that transform the source data into a JacksonEvent,
* and then writes to buffer.
*/
public class RecordBufferWriter {
private static final Logger LOG = LoggerFactory.getLogger(RecordBufferWriter.class);
static final String RECORDS_PROCESSED_COUNT = "recordsProcessed";
static final String RECORDS_PROCESSING_ERROR_COUNT = "recordProcessingErrors";
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final Counter recordSuccessCounter;
private final Counter recordErrorCounter;

private RecordBufferWriter(final BufferAccumulator<Record<Event>> bufferAccumulator,
final PluginMetrics pluginMetrics) {
this.bufferAccumulator = bufferAccumulator;
this.recordSuccessCounter = pluginMetrics.counter(RECORDS_PROCESSED_COUNT);
this.recordErrorCounter = pluginMetrics.counter(RECORDS_PROCESSING_ERROR_COUNT);
}

public static RecordBufferWriter create(final BufferAccumulator<Record<Event>> bufferAccumulator,
final PluginMetrics pluginMetrics) {
return new RecordBufferWriter(bufferAccumulator, pluginMetrics);
}

void flushBuffer() throws Exception {
bufferAccumulator.flush();
}

/**
* Add event record to buffer
*
* @param acknowledgementSet acknowledgmentSet keeps track of set of events
* @param record record to be written to buffer
* @throws Exception Exception if failed to write to buffer.
*/
public void addToBuffer(final AcknowledgementSet acknowledgementSet,
final Event record) throws Exception {
if (acknowledgementSet != null) {
acknowledgementSet.add(record);
}

bufferAccumulator.add(new Record<>(record));
}

public void writeToBuffer(final AcknowledgementSet acknowledgementSet,
final List<Event> records) {

int eventCount = 0;
for (final Event record : records) {
try {
addToBuffer(acknowledgementSet, record);
eventCount++;
} catch (Exception e) {
// will this cause too many logs?
LOG.error("Failed to add event to buffer due to {}", e.getMessage());
}
}

try {
flushBuffer();
recordSuccessCounter.increment(eventCount);
} catch (Exception e) {
LOG.error("Failed to write {} events to buffer due to {}", eventCount, e.getMessage());
recordErrorCounter.increment(eventCount);
}
}
}
Loading

0 comments on commit 0fe3f3a

Please sign in to comment.