Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aggregate metrics for ddb source export and stream #3724

Merged
merged 2 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ public static PluginMetrics fromNames(final String componentId, final String com
.add(componentId).toString());
}

/**
* Provides reference to APIs that register timer, counter, gauge into global registry.
*
* @param metricsPrefix the prefix to provide to metrics
* @return The {@link PluginMetrics}
*/
public static PluginMetrics fromPrefix(final String metricsPrefix) {
return new PluginMetrics(metricsPrefix);
}

private PluginMetrics(final String metricsPrefix) {
this.metricsPrefix = metricsPrefix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Collections;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
Expand All @@ -41,6 +42,18 @@ void setUp() {
objectUnderTest = PluginMetrics.fromPluginSetting(pluginSetting);
}

@Test
public void testCounterWithMetricsPrefix() {

final String prefix = UUID.randomUUID().toString();

objectUnderTest = PluginMetrics.fromPrefix(prefix);
final Counter counter = objectUnderTest.counter("counter");
assertEquals(
prefix + MetricNames.DELIMITER + "counter",
counter.getId().getName());
}

@Test
public void testCounter() {
final Counter counter = objectUnderTest.counter("counter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumerFactory;
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand Down Expand Up @@ -54,6 +55,8 @@ public class DynamoDBService {

private final PluginMetrics pluginMetrics;

private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;

private final AcknowledgementSetManager acknowledgementSetManager;


Expand All @@ -66,14 +69,15 @@ public DynamoDBService(final EnhancedSourceCoordinator coordinator,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.dynamoDBSourceConfig = sourceConfig;
this.dynamoDBSourceAggregateMetrics = new DynamoDBSourceAggregateMetrics();

// Initialize AWS clients
dynamoDbClient = clientFactory.buildDynamoDBClient();
dynamoDbStreamsClient = clientFactory.buildDynamoDbStreamClient();
s3Client = clientFactory.buildS3Client();

// A shard manager is responsible to retrieve the shard information from streams.
shardManager = new ShardManager(dynamoDbStreamsClient);
shardManager = new ShardManager(dynamoDbStreamsClient, dynamoDBSourceAggregateMetrics);
tableConfigs = sourceConfig.getTableConfigs();
executor = Executors.newFixedThreadPool(4);
}
Expand All @@ -89,12 +93,12 @@ public void start(Buffer<Record<Event>> buffer) {

LOG.info("Start running DynamoDB service");
ManifestFileReader manifestFileReader = new ManifestFileReader(new S3ObjectReader(s3Client));
Runnable exportScheduler = new ExportScheduler(coordinator, dynamoDbClient, manifestFileReader, pluginMetrics);
Runnable exportScheduler = new ExportScheduler(coordinator, dynamoDbClient, manifestFileReader, pluginMetrics, dynamoDBSourceAggregateMetrics);

DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer);
Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, buffer);
ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer);
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator(dynamoDBSourceConfig.getTableConfigs().get(0).getExportConfig() != null));
// leader scheduler will handle the initialization
Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) {
} else {
runLoader.whenComplete((v, ex) -> {
if (ex != null) {
LOG.error("There was an exception while processing an S3 data file: {}", ex);
coordinator.giveUpPartition(dataFilePartition);
}
numOfWorkers.decrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.ExportProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.ExportSummary;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.LoadStatus;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand Down Expand Up @@ -74,11 +75,15 @@ public class ExportScheduler implements Runnable {
private final Counter exportS3ObjectsTotalCounter;
private final Counter exportRecordsTotalCounter;

public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, DynamoDbClient dynamoDBClient, ManifestFileReader manifestFileReader, PluginMetrics pluginMetrics) {
public ExportScheduler(final EnhancedSourceCoordinator enhancedSourceCoordinator,
final DynamoDbClient dynamoDBClient,
final ManifestFileReader manifestFileReader,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics) {
this.enhancedSourceCoordinator = enhancedSourceCoordinator;
this.dynamoDBClient = dynamoDBClient;
this.pluginMetrics = pluginMetrics;
this.exportTaskManager = new ExportTaskManager(dynamoDBClient);
this.exportTaskManager = new ExportTaskManager(dynamoDBClient, dynamoDBSourceAggregateMetrics);

this.manifestFileReader = manifestFileReader;
executor = Executors.newCachedThreadPool();
Expand Down Expand Up @@ -213,7 +218,7 @@ private void createDataFilePartitions(final String exportArn,

private void closeExportPartitionWithError(ExportPartition exportPartition) {
LOG.error("The export from DynamoDb to S3 failed, it will be retried");
exportJobFailureCounter.increment(1);
exportJobFailureCounter.increment();
ExportProgressState exportProgressState = exportPartition.getProgressState().get();
// Clear current Arn, so that a new export can be submitted.
exportProgressState.setExportArn(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
Expand All @@ -14,6 +15,7 @@
import software.amazon.awssdk.services.dynamodb.model.ExportFormat;
import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeRequest;
import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.model.S3SseAlgorithm;

import java.time.Instant;
Expand All @@ -25,10 +27,12 @@ public class ExportTaskManager {
private static final ExportFormat DEFAULT_EXPORT_FORMAT = ExportFormat.ION;

private final DynamoDbClient dynamoDBClient;
private final DynamoDBSourceAggregateMetrics dynamoAggregateMetrics;


public ExportTaskManager(DynamoDbClient dynamoDBClient) {
public ExportTaskManager(final DynamoDbClient dynamoDBClient,
final DynamoDBSourceAggregateMetrics dynamoAggregateMetrics) {
this.dynamoDBClient = dynamoDBClient;
this.dynamoAggregateMetrics = dynamoAggregateMetrics;
}

public String submitExportJob(String tableArn, String bucket, String prefix, String kmsKeyId, Instant exportTime) {
Expand All @@ -46,12 +50,17 @@ public String submitExportJob(String tableArn, String bucket, String prefix, Str


try {
dynamoAggregateMetrics.getExportApiInvocations().increment();
ExportTableToPointInTimeResponse response = dynamoDBClient.exportTableToPointInTime(req);

String exportArn = response.exportDescription().exportArn();
String status = response.exportDescription().exportStatusAsString();
LOG.debug("Export Job submitted with ARN {} and status {}", exportArn, status);
return exportArn;
} catch (final InternalServerErrorException e) {
dynamoAggregateMetrics.getExport5xxErrors().increment();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also consider adding pipeline specific metric ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any value in that for these metrics at least. There may be other pipeline specific metrics we can add, but that can be a future PR

LOG.error("Failed to submit an export job with error: {}", e.getMessage());
return null;
} catch (SdkException e) {
LOG.error("Failed to submit an export job with error " + e.getMessage());
return null;
Expand All @@ -64,11 +73,15 @@ public String getExportManifest(String exportArn) {

String manifestKey = null;
try {
dynamoAggregateMetrics.getExportApiInvocations().increment();
DescribeExportResponse resp = dynamoDBClient.describeExport(request);
manifestKey = resp.exportDescription().exportManifest();

} catch (final InternalServerErrorException e) {
dynamoAggregateMetrics.getExport5xxErrors().increment();
LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage());
} catch (SdkException e) {
LOG.error("Unable to get manifest file for export " + exportArn);
LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage());
}
return manifestKey;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.leader;

import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;

Expand Down Expand Up @@ -43,10 +45,13 @@ public class ShardManager {


private final DynamoDbStreamsClient streamsClient;
private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;


public ShardManager(final DynamoDbStreamsClient streamsClient) {
public ShardManager(final DynamoDbStreamsClient streamsClient,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics) {
this.streamsClient = streamsClient;
this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics;
streamMap = new HashMap<>();
endingSequenceNumberMap = new HashMap<>();
}
Expand Down Expand Up @@ -148,22 +153,30 @@ private List<Shard> listShards(String streamArn, String lastEvaluatedShardId) {
long startTime = System.currentTimeMillis();
// Get all the shard IDs from the stream.
List<Shard> shards = new ArrayList<>();
do {
DescribeStreamRequest req = DescribeStreamRequest.builder()
.streamArn(streamArn)
.limit(MAX_SHARD_COUNT)
.exclusiveStartShardId(lastEvaluatedShardId)
.build();

DescribeStreamResponse describeStreamResult = streamsClient.describeStream(req);
shards.addAll(describeStreamResult.streamDescription().shards());
try {
do {
DescribeStreamRequest req = DescribeStreamRequest.builder()
.streamArn(streamArn)
.limit(MAX_SHARD_COUNT)
.exclusiveStartShardId(lastEvaluatedShardId)
.build();

// If LastEvaluatedShardId is set,
// at least one more page of shard IDs to retrieve
lastEvaluatedShardId = describeStreamResult.streamDescription().lastEvaluatedShardId();
dynamoDBSourceAggregateMetrics.getStreamApiInvocations().increment();
DescribeStreamResponse describeStreamResult = streamsClient.describeStream(req);
shards.addAll(describeStreamResult.streamDescription().shards());

// If LastEvaluatedShardId is set,
// at least one more page of shard IDs to retrieve
lastEvaluatedShardId = describeStreamResult.streamDescription().lastEvaluatedShardId();

} while (lastEvaluatedShardId != null);

} while (lastEvaluatedShardId != null);
} catch(final InternalServerErrorException e) {
LOG.error("Received an internal server exception from DynamoDB while listing shards: {}", e.getMessage());
dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment();
return shards;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rethrow the error here instead of potentially returning partial shards?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's anything wrong with returning partial shards, it just means we will only check those shards for child shards. The next time we run describeStream we will still go through the same shards again and create child shards for them (even the ones we return here) until 24 hours is up (when the shard would expire).

}

long endTime = System.currentTimeMillis();
LOG.info("Listing shards (DescribeStream call) took {} milliseconds with {} shards found", endTime - startTime, shards.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;

import java.time.Duration;
Expand Down Expand Up @@ -101,6 +103,8 @@ public class ShardConsumer implements Runnable {

private final String shardId;

private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;

private long recordsWrittenToBuffer;

private ShardConsumer(Builder builder) {
Expand All @@ -117,10 +121,14 @@ private ShardConsumer(Builder builder) {
this.shardAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout;
this.shardId = builder.shardId;
this.recordsWrittenToBuffer = 0;
this.dynamoDBSourceAggregateMetrics = builder.dynamoDBSourceAggregateMetrics;
}

public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer<Record<Event>> buffer) {
return new Builder(dynamoDbStreamsClient, pluginMetrics, buffer);
public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics,
final Buffer<Record<Event>> buffer) {
return new Builder(dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer);
}


Expand All @@ -130,6 +138,8 @@ static class Builder {

private final PluginMetrics pluginMetrics;

private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;

private final Buffer<Record<Event>> buffer;

private TableInfo tableInfo;
Expand All @@ -149,9 +159,13 @@ static class Builder {
private AcknowledgementSet acknowledgementSet;
private Duration dataFileAcknowledgmentTimeout;

public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer<Record<Event>> buffer) {
public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics,
final Buffer<Record<Event>> buffer) {
this.dynamoDbStreamsClient = dynamoDbStreamsClient;
this.pluginMetrics = pluginMetrics;
this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics;
this.buffer = buffer;
}

Expand Down Expand Up @@ -303,9 +317,13 @@ private GetRecordsResponse callGetRecords(String shardIterator) {
.build();

try {
dynamoDBSourceAggregateMetrics.getStreamApiInvocations().increment();
GetRecordsResponse response = dynamoDbStreamsClient.getRecords(req);
return response;
} catch (Exception e) {
} catch(final InternalServerErrorException ex) {
dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment();
throw new RuntimeException(ex.getMessage());
} catch (final Exception e) {
throw new RuntimeException(e.getMessage());
}

Expand Down
Loading
Loading