-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial PR for stream support for Postgres in Rds source #5310
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
@@ -121,21 +133,23 @@ public void start(Buffer<Record<Event>> buffer) { | |||
} | |||
|
|||
if (sourceConfig.isStreamEnabled()) { | |||
BinlogClientFactory binaryLogClientFactory = new BinlogClientFactory(sourceConfig, rdsClient, dbMetadata); | |||
ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class needs some refactor. We should follow single responsibility principle here. This class has dependency on MySQL and Postgres Client. We should move these dependencies to separate class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some refactoring and extracted out the creation of schemaManagers in separate classes. The ReplicationLogClientFactory
already hides the creation of MySQL/Postgres clients.
@@ -16,6 +16,12 @@ public class StreamProgressState { | |||
@JsonProperty("currentPosition") | |||
private BinlogCoordinate currentPosition; | |||
|
|||
@JsonProperty("currentLsn") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider creating new class to separate MySQL vs Postgres properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This model is used in serialization/deserialization by the source coordinator, it's a bit more involved to separate into two classes. I added a TODO in code and will address it in followup PRs.
@@ -10,7 +10,7 @@ | |||
import java.sql.SQLException; | |||
import java.util.Properties; | |||
|
|||
public class ConnectionManager { | |||
public class MySqlConnectionManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider creating ConnectionManager interface and implementation for MySql/Postgres ConnectionManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created the interface.
// If it's INSERT/UPDATE/DELETE, prepare events | ||
// If it's a COMMIT, convert all prepared events and send to buffer | ||
char messageType = (char) msg.get(); | ||
if (messageType == 'B') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using enums instead of char for message types for type safety, readability and maintainability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
pipelineEvents = new ArrayList<>(); | ||
} | ||
|
||
public void process(ByteBuffer msg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement the missing functionality for UPDATE/DELETE message types 'K' and 'O' ?
https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can ByteBuffer msg be null ? Is there any validation to be done on msg ByteBuffer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement the missing functionality for UPDATE/DELETE message types 'K' and 'O' ?
Done. I updated the UPDATE message processor to include 'K' (primary keys were updated) and 'O' (old row data is included). For DELETE, it shouldn't matter as long as the row data includes primary keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can ByteBuffer msg be null ? Is there any validation to be done on msg ByteBuffer ?
There's a null check for msg
in the main while loop in LogicalReplicationClient.connect()
.
long epochMicro = msg.getLong(); | ||
|
||
if (currentLsn != commitLsn) { | ||
// This shouldn't happen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider throwing an exception instead of just logging ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to throw an exception.
Signed-off-by: Hai Yan <[email protected]>
c61da05
to
81c5883
Compare
Signed-off-by: Hai Yan <[email protected]>
import java.util.Map; | ||
|
||
public enum ColumnType { | ||
BOOLEAN(16, "boolean"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are those typeId from postgres? Is there way we can use their SDK variables instead of the hardcoded number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they are from postgres, but I don't think they have those in their libraries. Postgres has a system table pg_type
that contains those type information. So an alternative would be to query the pg_type
table the type ids and other type information.
throw new RuntimeException("Commit LSN does not match current LSN, skipping"); | ||
} | ||
|
||
writeToBuffer(bufferAccumulator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QUES: will we flush uncommitted messages if numberOfRecordsToAccumulate or bufferTimeout is reached? If yes, Is it something we want to avoid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This writeToBuffer() is called when processing the COMMIT message. So we will only write records to buffer when the changes are committed.
} | ||
|
||
public static ColumnType getByTypeId(int typeId) { | ||
return TYPE_ID_MAP.get(typeId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add exception handling for typeId
// Create replication slot | ||
PGReplicationConnection replicationConnection = pgConnection.getReplicationAPI(); | ||
try { | ||
replicationConnection.createReplicationSlot() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check if the replication slot exist and reuse if one exist.
retry++; | ||
} | ||
LOG.warn("Failed to get primary keys for table {}", table); | ||
return List.of(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception handling when PrimaryKey is not present vs Internal DB Exception/Errors.
.withSlotOption("proto_version", "1") | ||
.withSlotOption("publication_names", "my_publication"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Move the constant to static variable
- Use the created publication names.
stream.setFlushedLSN(lsn); | ||
stream.setAppliedLSN(lsn); | ||
} catch (Exception e) { | ||
LOG.error("Exception while processing Postgres replication stream. ", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add metrics in this class for exception, error and success cases.
final long eventTimestampMillis = currentEventTimestamp; | ||
|
||
char typeId = (char) msg.get(); | ||
if (typeId == 'N') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move the typeId to enum
Description
Initial PR for stream support for Postgres in Rds source. The implementation uses the existing structure for MySQL stream support. The correspondence is as follows:
BinaryLogClient
/BinlogClientWrapper
LogicalReplicationClient
MySqlConnectionManager
(used to beConnectionManager
)PostgresConnectionManager
MySqlSchemaManager
(used to beSchemaManager
)PostgresSchemaManager
Major changes:
This is an initial PR, will follow up with more changes to add e2e ack, checkpointing, resync, data type mapping etc. for Postgres.
Testing
Tested Data Prepper locally against Aurora Postgres DB.
Issues Resolved
Contributes to #5309
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.