-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Speed Up initial snapshot from MongoDB #3675
Speed Up initial snapshot from MongoDB #3675
Conversation
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
…s and using distributed node to ingest. Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
| force_update | NO | FALSE | Boolean | When restarting or updating a pipeline, whether to force all connectors to update their config even if the connector name already exists. By default, if the connector name exists, the config will not be updated. The connector name is `<topic_prefix>.<table_name>`. | | ||
| Option | Required | Default | Type | Description | | ||
|----------------|----------|---------------|--------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | ||
| hostname | YES | | String | The hostname of MySQL. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MySQL -> MongoDB server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| Option | Required | Default | Type | Description | | ||
|----------------|----------|---------------|--------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | ||
| hostname | YES | | String | The hostname of MySQL. | | ||
| port | NO | 27017 | String | The port of MySQL. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MySQL -> MongoDB server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@Override | ||
public void stop(){ | ||
super.stop(); | ||
if (shouldStartInitialLoad() && Objects.nonNull(mongoDBService) && Objects.nonNull(sourceCoordinator)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about when ingestion_mode is just stream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for stream,
- the initialLoad will not be executed. (even in start function).
- It will only start Kafka connect, which is in super class.
public class MongoDBHelper { | ||
|
||
public static MongoClient getMongoClient(final MongoDBConfig mongoDBConfig) { | ||
String template = "mongodb://%s:%s@%s:%s/?replicaSet=rs0&directConnection=true&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this to static constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
} | ||
|
||
private void startProcessPartition(SourcePartition<MongoDBSnapshotProgressState> partition) { | ||
List<String> partitionKeys = List.of(partition.getPartitionKey().split("\\|")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move the split delimiter to constant. Are all the partition key in this format ? Does user have control on this format ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Yes, all partition key is in this format. User doesn't have control on this format
.sort(new Document("_id", 1)) | ||
.limit(1); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should log this error and emit metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The runtime exception should already been logged and emit metrics by source coordinator. This is executed by that.
But I will log another error message anyways.
final String lte = partitionKeys.get(2); | ||
final String className = partitionKeys.get(3); | ||
if (collection.size() < 2) { | ||
throw new RuntimeException("Invalid Collection Name. Must as db.collection format"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this validation be done upfront ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, updated
progressState.setFailure(failedRecords); | ||
sourceCoordinator.saveProgressStateForPartition(partition.getPartitionKey(), progressState); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log the error and add metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is already logged. Read this part of the code
try {
final Optional<AcknowledgementSet> acknowledgementSet = createAcknowledgementSet(snapshotPartition.get());
this.startProcessPartition(snapshotPartition.get());
if (acknowledgementSet.isEmpty()) {
sourceCoordinator.completePartition(snapshotPartition.get().getPartitionKey(), false);
} else {
sourceCoordinator.updatePartitionForAcknowledgmentWait(snapshotPartition.get().getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT);
acknowledgementSet.get().complete();
}
successPartitionCounter.increment();
} catch (final Exception e) {
LOG.error("Received an exception while processing the partition.", e);
sourceCoordinator.giveUpPartitions();
failureParitionCounter.increment();
}
private static final String EVENT_SOURCE_COLLECTION_ATTRIBUTE = "__collection"; | ||
private static final String EVENT_SOURCE_DB_ATTRIBUTE = "__source_db"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for this naming pattern ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we agreed that we are running kafka connect for stream + initial snapshot together.
This naming pattern is from Kafka Connect when streaming the data. We need to keep that consistant
successRecords += 1; | ||
} catch (Exception e) { | ||
LOG.error("failed to add record to buffer with error {}", e.getMessage()); | ||
failureItemsCounter.increment(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will these records be retried by acknowledgementSet manager if acknowledgement is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, it will. But please double confirm with Tylor.
I am writing same as other places where acknowledgementSet is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this happens and this record is already added to the acknowledgment set, then it would end up resulting in an acknowledgment timeout and be retried eventually. But these failures should really go to a DLQ of some sort, which we don't have right now
Signed-off-by: Haidong <[email protected]>
Signed-off-by: Haidong <[email protected]>
| port | NO | 27017 | String | The port of MongoDB server. | | ||
| ssl | NO | FALSE | Boolean | Connector will use SSL to connect to MongoDB instances. | | ||
| ingestion_mode | NO | export_stream | String | MongoDB ingestion mode. Available options: export_stream, stream, export | | ||
| export_config | NO | | ExportConfig | The Export Config | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this just be export
to the user? Would align with dynamodb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not really export. there is no export feature from MongoDB like DynamoDB. You are really connect to DB and read the data from it. Calling ingestion makes more sense.
public void start(Buffer<Record<Object>> buffer) { | ||
super.start(buffer); | ||
if (shouldStartInitialLoad()) { | ||
LOG.info("Starting initial load"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initial load of what? Just the export? Could maybe add some description here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial load of existing mongoDB data. It is different with DynamoDB's export. Calling export just to make it similar for users.
} | ||
|
||
@Override | ||
public <T> void setSourceCoordinator(final SourceCoordinator<T> sourceCoordinator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this source coordinator going to have enough flexibility to perform the streams? In the dynamodb
source we use an EnhancedSourceCoordinator
, which can be changed by implementing UsesEnhancedSourceCoordination
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For stream, we are using Kafka Connect (Debezium connectors). We do not need to do what DynamoDB has done.
successRecords += 1; | ||
} catch (Exception e) { | ||
LOG.error("failed to add record to buffer with error {}", e.getMessage()); | ||
failureItemsCounter.increment(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this happens and this record is already added to the acknowledgment set, then it would end up resulting in an acknowledgment timeout and be retried eventually. But these failures should really go to a DLQ of some sort, which we don't have right now
Description
Speed Up initial snapshot from MongoDB, by splitting single collection into chunks/partitions, and allow parallel partition process from multiple OSDP nodes
Issues Resolved
Resolves #3673
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.