Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bug fixes and improvements to DDB source #3559

Merged
merged 2 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/dynamodb-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {

testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation testLibs.mockito.inline
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,21 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
Expand All @@ -37,30 +46,37 @@ public class DataFileLoader implements Runnable {
*/
private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 2 * 60_000;

static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how were this number chosen ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both number won't make too much differences.
There will be retries after timeout anyway. 1000 is the batch size we used for reading file and reading stream data, so we just use the same.


private final String bucketName;

private final String key;

private final ExportRecordConverter recordConverter;

private final S3ObjectReader s3ObjectReader;
private final S3ObjectReader objectReader;

private final DataFileCheckpointer checkpointer;

// Start Line is the checkpoint
/**
* Start Line is the checkpoint
*/
private final int startLine;

private DataFileLoader(Builder builder) {
this.s3ObjectReader = builder.s3ObjectReader;
this.recordConverter = builder.recordConverter;
this.objectReader = builder.objectReader;
this.bucketName = builder.bucketName;
this.key = builder.key;
this.checkpointer = builder.checkpointer;
this.startLine = builder.startLine;

final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
recordConverter = new ExportRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics);
}

public static Builder builder() {
return new Builder();
public static Builder builder(final S3ObjectReader s3ObjectReader, final PluginMetrics pluginMetrics, final Buffer<Record<Event>> buffer) {
return new Builder(s3ObjectReader, pluginMetrics, buffer);
}


Expand All @@ -69,9 +85,14 @@ public static Builder builder() {
*/
static class Builder {

private S3ObjectReader s3ObjectReader;
private final S3ObjectReader objectReader;

private final PluginMetrics pluginMetrics;

private final Buffer<Record<Event>> buffer;

private TableInfo tableInfo;

private ExportRecordConverter recordConverter;

private DataFileCheckpointer checkpointer;

Expand All @@ -81,13 +102,14 @@ static class Builder {

private int startLine;

public Builder s3ObjectReader(S3ObjectReader s3ObjectReader) {
this.s3ObjectReader = s3ObjectReader;
return this;
public Builder(final S3ObjectReader objectReader, final PluginMetrics pluginMetrics, final Buffer<Record<Event>> buffer) {
this.objectReader = objectReader;
this.pluginMetrics = pluginMetrics;
this.buffer = buffer;
}

public Builder recordConverter(ExportRecordConverter recordConverter) {
this.recordConverter = recordConverter;
public Builder tableInfo(TableInfo tableInfo) {
this.tableInfo = tableInfo;
return this;
}

Expand Down Expand Up @@ -128,7 +150,9 @@ public void run() {
int lineCount = 0;
int lastLineProcessed = 0;

try (GZIPInputStream gzipInputStream = new GZIPInputStream(s3ObjectReader.readFile(bucketName, key))) {
try {
InputStream inputStream = objectReader.readFile(bucketName, key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
try (
  InputStream inputStream = objectReader.readFile(bucketName, key);
  GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
  BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream));
) {
..
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know we could do that. I thought it can only be one.

GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream));

String line;
Expand Down Expand Up @@ -170,11 +194,15 @@ public void run() {
}

lines.clear();

reader.close();
gzipInputStream.close();
inputStream.close();
LOG.info("Complete loading s3://{}/{}", bucketName, key);
} catch (Exception e) {
} catch (IOException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this exception scoped down ? How are we handling other exception ?

Copy link
Contributor Author

@daixba daixba Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. There may be other exceptions. Since this is merged, I will update this in the next PR.

checkpointer.checkpoint(lineCount);
String errorMessage = String.format("Loading of s3://{}/{} completed with Exception: {}", bucketName, key, e.getMessage());

String errorMessage = String.format("Loading of s3://%s/%s completed with Exception: %S", bucketName, key, e.getMessage());
throw new RuntimeException(errorMessage);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,19 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import software.amazon.awssdk.services.s3.S3Client;

import java.time.Duration;

/**
* Factory class for DataFileLoader thread.
*/
public class DataFileLoaderFactory {
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;

private final EnhancedSourceCoordinator coordinator;

Expand All @@ -39,17 +33,14 @@ public DataFileLoaderFactory(EnhancedSourceCoordinator coordinator, S3Client s3C
}

public Runnable createDataFileLoader(DataFilePartition dataFilePartition, TableInfo tableInfo) {
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
ExportRecordConverter recordProcessor = new ExportRecordConverter(bufferAccumulator, tableInfo, pluginMetrics);

DataFileCheckpointer checkpointer = new DataFileCheckpointer(coordinator, dataFilePartition);

// Start a data loader thread.
DataFileLoader loader = DataFileLoader.builder()
.s3ObjectReader(objectReader)
DataFileLoader loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer)
.bucketName(dataFilePartition.getBucket())
.key(dataFilePartition.getKey())
.recordConverter(recordProcessor)
.tableInfo(tableInfo)
.checkpointer(checkpointer)
.startLine(dataFilePartition.getProgressState().get().getLoaded())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public DataFileScheduler(EnhancedSourceCoordinator coordinator, DataFileLoaderFa
this.coordinator = coordinator;
this.pluginMetrics = pluginMetrics;
this.loaderFactory = loaderFactory;


executor = Executors.newFixedThreadPool(MAX_JOB_COUNT);

this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT);
Expand All @@ -76,7 +74,7 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) {
String tableArn = getTableArn(exportArn);

TableInfo tableInfo = getTableInfo(tableArn);

Runnable loader = loaderFactory.createDataFileLoader(dataFilePartition, tableInfo);
CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor);
runLoader.whenComplete(completeDataLoader(dataFilePartition));
Expand Down Expand Up @@ -166,6 +164,17 @@ private BiConsumer completeDataLoader(DataFilePartition dataFilePartition) {
};
}

/**
* There is a global state with sourcePartitionKey the export Arn,
* to track the number of files are processed. <br/>
* Each time, load of a data file is completed,
* The state must be updated.<br/>
* Note that the state may be updated since multiple threads are updating the same state.
* Retry is required.
*
* @param exportArn Export Arn.
* @param loaded Number records Loaded.
*/
private void updateState(String exportArn, int loaded) {

String streamArn = getStreamArn(exportArn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class ExportSummary {
private long billedSizeBytes;

@JsonProperty("itemCount")
private int itemCount;
private long itemCount;

@JsonProperty("outputFormat")
private String outputFormat;
Expand Down Expand Up @@ -115,7 +115,7 @@ public long getBilledSizeBytes() {
return billedSizeBytes;
}

public int getItemCount() {
public long getItemCount() {
return itemCount;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ public class LoadStatus {

private int loadedFiles;

private int totalRecords;
private long totalRecords;

private int loadedRecords;
private long loadedRecords;

public LoadStatus(int totalFiles, int loadedFiles, int totalRecords, int loadedRecords) {
public LoadStatus(int totalFiles, int loadedFiles, long totalRecords, long loadedRecords) {
this.totalFiles = totalFiles;
this.loadedFiles = loadedFiles;
this.totalRecords = totalRecords;
Expand All @@ -45,19 +45,19 @@ public void setLoadedFiles(int loadedFiles) {
this.loadedFiles = loadedFiles;
}

public int getTotalRecords() {
public long getTotalRecords() {
return totalRecords;
}

public void setTotalRecords(int totalRecords) {
this.totalRecords = totalRecords;
}

public int getLoadedRecords() {
public long getLoadedRecords() {
return loadedRecords;
}

public void setLoadedRecords(int loadedRecords) {
public void setLoadedRecords(long loadedRecords) {
this.loadedRecords = loadedRecords;
}

Expand All @@ -72,10 +72,10 @@ public Map<String, Object> toMap() {

public static LoadStatus fromMap(Map<String, Object> map) {
return new LoadStatus(
(int) map.get(TOTAL_FILES),
(int) map.get(LOADED_FILES),
(int) map.get(TOTAL_RECORDS),
(int) map.get(LOADED_RECORDS)
((Number) map.get(TOTAL_FILES)).intValue(),
((Number) map.get(LOADED_FILES)).intValue(),
((Number) map.get(TOTAL_RECORDS)).longValue(),
((Number) map.get(LOADED_RECORDS)).longValue()
);
}
}
Loading
Loading