Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial PR for stream support for Postgres in Rds source #5310

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/rds-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {

implementation 'com.zendesk:mysql-binlog-connector-java:0.29.2'
implementation 'com.mysql:mysql-connector-j:8.4.0'
implementation 'org.postgresql:postgresql:42.7.4'

compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler;
import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.source.rds.export.ExportTaskManager;
Expand All @@ -26,9 +27,13 @@
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.ResyncScheduler;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManagerFactory;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManagerFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.ReplicationLogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener;
import org.slf4j.Logger;
Expand All @@ -37,6 +42,7 @@
import software.amazon.awssdk.services.s3.S3Client;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -101,9 +107,16 @@ public void start(Buffer<Record<Event>> buffer) {
new ClusterApiStrategy(rdsClient) : new InstanceApiStrategy(rdsClient);
final DbMetadata dbMetadata = rdsApiStrategy.describeDb(sourceConfig.getDbIdentifier());
final String s3PathPrefix = getS3PathPrefix();

final SchemaManager schemaManager = getSchemaManager(sourceConfig, dbMetadata);
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(schemaManager);
final DbTableMetadata dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap);
DbTableMetadata dbTableMetadata;
if (sourceConfig.getEngine() == EngineType.MYSQL) {
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(
(MySqlSchemaManager) schemaManager);
dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap);
} else {
dbTableMetadata = new DbTableMetadata(dbMetadata, Collections.emptyMap());
}

leaderScheduler = new LeaderScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, schemaManager, dbTableMetadata);
Expand All @@ -121,21 +134,23 @@ public void start(Buffer<Record<Event>> buffer) {
}

if (sourceConfig.isStreamEnabled()) {
BinlogClientFactory binaryLogClientFactory = new BinlogClientFactory(sourceConfig, rdsClient, dbMetadata);
ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class needs some refactor. We should follow single responsibility principle here. This class has dependency on MySQL and Postgres Client. We should move these dependencies to separate class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some refactoring and extracted out the creation of schemaManagers in separate classes. The ReplicationLogClientFactory already hides the creation of MySQL/Postgres clients.


if (sourceConfig.isTlsEnabled()) {
binaryLogClientFactory.setSSLMode(SSLMode.REQUIRED);
replicationLogClientFactory.setSSLMode(SSLMode.REQUIRED);
} else {
binaryLogClientFactory.setSSLMode(SSLMode.DISABLED);
replicationLogClientFactory.setSSLMode(SSLMode.DISABLED);
}

streamScheduler = new StreamScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, binaryLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable);
sourceCoordinator, sourceConfig, s3PathPrefix, replicationLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable);
runnableList.add(streamScheduler);

resyncScheduler = new ResyncScheduler(
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(resyncScheduler);
if (sourceConfig.getEngine() == EngineType.MYSQL) {
resyncScheduler = new ResyncScheduler(
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(resyncScheduler);
}
}

executor = Executors.newFixedThreadPool(runnableList.size());
Expand Down Expand Up @@ -164,19 +179,14 @@ public void shutdown() {
}

private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final ConnectionManager connectionManager = new ConnectionManager(
dbMetadata.getEndpoint(),
dbMetadata.getPort(),
sourceConfig.getAuthenticationConfig().getUsername(),
sourceConfig.getAuthenticationConfig().getPassword(),
sourceConfig.isTlsEnabled());
return new SchemaManager(connectionManager);
final ConnectionManager connectionManager = new ConnectionManagerFactory(sourceConfig, dbMetadata).getConnectionManager();
return new SchemaManagerFactory(connectionManager).getSchemaManager();
}

private QueryManager getQueryManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final String readerEndpoint = dbMetadata.getReaderEndpoint() != null ? dbMetadata.getReaderEndpoint() : dbMetadata.getEndpoint();
final int readerPort = dbMetadata.getReaderPort() == 0 ? dbMetadata.getPort() : dbMetadata.getReaderPort();
final ConnectionManager readerConnectionManager = new ConnectionManager(
final MySqlConnectionManager readerConnectionManager = new MySqlConnectionManager(
readerEndpoint,
readerPort,
sourceConfig.getAuthenticationConfig().getUsername(),
Expand All @@ -203,13 +213,11 @@ private String getS3PathPrefix() {
return s3PathPrefix;
}

private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) {
private Map<String, Map<String, String>> getColumnDataTypeMap(final MySqlSchemaManager schemaManager) {
return sourceConfig.getTableNames().stream()
.collect(Collectors.toMap(
fullTableName -> fullTableName,
fullTableName -> schemaManager.getColumnDataTypes(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1])
));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

public enum EngineType {

MYSQL("mysql");
MYSQL("mysql"),
POSTGRES("postgres");

private static final Map<String, EngineType> ENGINE_TYPE_MAP = Arrays.stream(EngineType.values())
.collect(Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,74 @@
import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyRelation;

import java.util.List;
import java.util.Map;

public class StreamProgressState {

@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;
// TODO: separate MySQL and Postgres properties into different progress state classes
// Common
@JsonProperty("engineType")
private String engineType;

@JsonProperty("waitForExport")
private boolean waitForExport = false;

/**
* Map of table name to primary keys
*/
@JsonProperty("primaryKeyMap")
private Map<String, List<String>> primaryKeyMap;

// For MySQL
@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;

@JsonProperty("foreignKeyRelations")
private List<ForeignKeyRelation> foreignKeyRelations;

// For Postgres
@JsonProperty("currentLsn")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider creating new class to separate MySQL vs Postgres properties.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This model is used in serialization/deserialization by the source coordinator, it's a bit more involved to separate into two classes. I added a TODO in code and will address it in followup PRs.

private String currentLsn;

@JsonProperty("replicationSlotName")
private String replicationSlotName;

public String getEngineType() {
return engineType;
}

public void setEngineType(String engineType) {
this.engineType = engineType;
}

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
}

public String getCurrentLsn() {
return currentLsn;
}

public Map<String, List<String>> getPrimaryKeyMap() {
return primaryKeyMap;
}

public void setPrimaryKeyMap(Map<String, List<String>> primaryKeyMap) {
this.primaryKeyMap = primaryKeyMap;
}

public String getReplicationSlotName() {
return replicationSlotName;
}

public void setCurrentPosition(BinlogCoordinate currentPosition) {
this.currentPosition = currentPosition;
}

public void setReplicationSlotName(String replicationSlotName) {
this.replicationSlotName = replicationSlotName;
}

public boolean shouldWaitForExport() {
return waitForExport;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres;

import java.util.HashMap;
import java.util.Map;

public enum ColumnType {
BOOLEAN(16, "boolean"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are those typeId from postgres? Is there way we can use their SDK variables instead of the hardcoded number

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are from postgres, but I don't think they have those in their libraries. Postgres has a system table pg_type that contains those type information. So an alternative would be to query the pg_type table the type ids and other type information.

SMALLINT(21, "smallint"),
INTEGER(23, "integer"),
BIGINT(20, "bigint"),
REAL(700, "real"),
DOUBLE_PRECISION(701, "double precision"),
NUMERIC(1700, "numeric"),
TEXT(25, "text"),
VARCHAR(1043, "varchar"),
DATE(1082, "date"),
TIME(1083, "time"),
TIMESTAMP(1114, "timestamp"),
TIMESTAMPTZ(1184, "timestamptz"),
UUID(2950, "uuid"),
JSON(114, "json"),
JSONB(3802, "jsonb");

private final int typeId;
private final String typeName;

private static final Map<Integer, ColumnType> TYPE_ID_MAP = new HashMap<>();

static {
for (ColumnType type : values()) {
TYPE_ID_MAP.put(type.typeId, type);
}
}

ColumnType(int typeId, String typeName) {
this.typeId = typeId;
this.typeName = typeName;
}

public int getTypeId() {
return typeId;
}

public String getTypeName() {
return typeName;
}

public static ColumnType getByTypeId(int typeId) {
return TYPE_ID_MAP.get(typeId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add exception handling for typeId

}

public static String getTypeNameByEnum(ColumnType columnType) {
return columnType.getTypeName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition;
Expand All @@ -17,6 +18,8 @@
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.PostgresSchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -25,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.rds.RdsService.S3_PATH_DELIMITER;
Expand Down Expand Up @@ -152,22 +156,41 @@ private Map<String, List<String>> getPrimaryKeyMap() {
return sourceConfig.getTableNames().stream()
.collect(Collectors.toMap(
fullTableName -> fullTableName,
fullTableName -> schemaManager.getPrimaryKeys(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1])
fullTableName -> schemaManager.getPrimaryKeys(fullTableName)
));
}

private void createStreamPartition(RdsSourceConfig sourceConfig) {
final StreamProgressState progressState = new StreamProgressState();
progressState.setEngineType(sourceConfig.getEngine().toString());
progressState.setWaitForExport(sourceConfig.isExportEnabled());
getCurrentBinlogPosition().ifPresent(progressState::setCurrentPosition);
progressState.setForeignKeyRelations(schemaManager.getForeignKeyRelations(sourceConfig.getTableNames()));
progressState.setPrimaryKeyMap(getPrimaryKeyMap());
if (sourceConfig.getEngine() == EngineType.MYSQL) {
getCurrentBinlogPosition().ifPresent(progressState::setCurrentPosition);
progressState.setForeignKeyRelations(((MySqlSchemaManager)schemaManager).getForeignKeyRelations(sourceConfig.getTableNames()));
} else {
// Postgres
// Create replication slot, which will mark the starting point for stream
final String publicationName = generatePublicationName();
final String slotName = generateReplicationSlotName();
((PostgresSchemaManager)schemaManager).createLogicalReplicationSlot(sourceConfig.getTableNames(), publicationName, slotName);
progressState.setReplicationSlotName(slotName);
}
StreamPartition streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState);
sourceCoordinator.createPartition(streamPartition);
}

private Optional<BinlogCoordinate> getCurrentBinlogPosition() {
Optional<BinlogCoordinate> binlogCoordinate = schemaManager.getCurrentBinaryLogPosition();
Optional<BinlogCoordinate> binlogCoordinate = ((MySqlSchemaManager)schemaManager).getCurrentBinaryLogPosition();
LOG.debug("Current binlog position: {}", binlogCoordinate.orElse(null));
return binlogCoordinate;
}

private String generatePublicationName() {
return "data_prepper_publication_" + UUID.randomUUID().toString().substring(0, 8);
}

private String generateReplicationSlotName() {
return "data_prepper_slot_" + UUID.randomUUID().toString().substring(0, 8);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.rds.model;

public enum MessageType {
BEGIN('B'),
RELATION('R'),
INSERT('I'),
UPDATE('U'),
DELETE('D'),
COMMIT('C');

private final char value;

MessageType(char value) {
this.value = value;
}

public char getValue() {
return value;
}
}
Loading
Loading