Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vitess VStream Replicator Support #1943

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0458377
Initial WIP implementation of Vitess VStream replicator support
kovyrin Nov 9, 2022
b1d7b77
Switch to using tabs instead of spaces
kovyrin Nov 21, 2022
361f080
More space->tabs conversion
kovyrin Nov 21, 2022
eca0395
Fix a tab vs space issue
kovyrin Nov 22, 2022
d2cbc20
Add Vitess position tracking/recovery support
kovyrin Nov 22, 2022
d140427
Cleanup Position classes (move Vitess stuff into a dedicated class)
kovyrin Nov 22, 2022
7c7abef
Revert some formatting changes introduced by VScode
kovyrin Nov 22, 2022
60bf28e
Another VScode change
kovyrin Nov 22, 2022
4dd9da7
Another VScode change
kovyrin Nov 22, 2022
a3ea1aa
Add a schema migration and change the vgtid column in positions
kovyrin Nov 23, 2022
eee6f01
Stop VStream replicator if the observer dies for whatever reason.
kovyrin Nov 24, 2022
236092e
When running against Vitess, we cannot rely on read_only variable
kovyrin Nov 24, 2022
6e7a4d9
Unsupported event is not a warning
kovyrin Nov 24, 2022
918efaa
Add Vitess test scripts, docs, etc
kovyrin Nov 24, 2022
3b282ce
Intern Vitess keyspace/table/column/type names + enum values to reduc…
kovyrin Nov 24, 2022
b712ae0
Add a cleanup script for Vitess stuff
kovyrin Nov 24, 2022
3265121
Formatting cleanup
kovyrin Nov 24, 2022
8841ba4
Add TLS support for VTgate connections
kovyrin Nov 25, 2022
be4cc0e
Added TLS client cert authentication support
kovyrin Nov 25, 2022
2eabf29
Revert unrelated changes to Position class
kovyrin Nov 26, 2022
c01b8a5
Ignore an empty VGTID from the positions table
kovyrin Nov 30, 2022
9c35476
Restore ports
kovyrin Nov 30, 2022
ea115c6
Ensure we get a PKCS#8 private key
kovyrin Nov 30, 2022
bb3370c
No need to explicitly enable plaintext
kovyrin Nov 30, 2022
1dbb36a
Merge branch 'master' into vitess-experimental
kovyrin Dec 1, 2022
96a8004
Implement user/password auth on grpc calls
kovyrin Dec 5, 2022
92d58f7
Tell the server we want to stop before closing the stream
kovyrin Dec 7, 2022
8e99145
Merge branch 'master' into vitess-experimental
kovyrin Dec 7, 2022
7d8d528
Re-enable opencensus stuff
kovyrin Dec 8, 2022
64f02d1
Remove the extra empty line
kovyrin Dec 8, 2022
e96ef9c
Capture old values from VStream UPDATE events
kovyrin Dec 12, 2022
312aab2
Change vitess ports to not conflict with Vitess dev clusters
kovyrin Dec 12, 2022
be63881
Do not recover from positions with different client ids when running …
kovyrin Feb 1, 2023
ccf2446
Move extremely verbose Vitess replicator logs to the trace level
kovyrin Mar 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@
<version>15.3</version>
</dependency>

<dependency>
<groupId>io.vitess</groupId>
<artifactId>vitess-grpc-client</artifactId>
<version>14.0.1</version>
</dependency>

<!-- utils and support libs -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down Expand Up @@ -291,7 +297,6 @@
<artifactId>jedis</artifactId>
<version>3.5.1</version>
</dependency>

<!-- metrics -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
Expand Down
95 changes: 58 additions & 37 deletions src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.zendesk.maxwell.replication.BinlogConnectorReplicator;
import com.zendesk.maxwell.replication.Position;
import com.zendesk.maxwell.replication.Replicator;
import com.zendesk.maxwell.replication.VStreamReplicator;
import com.zendesk.maxwell.row.HeartbeatRowMap;
import com.zendesk.maxwell.schema.*;
import com.zendesk.maxwell.schema.columndef.ColumnDefCastException;
Expand Down Expand Up @@ -174,7 +175,7 @@ protected Position getInitialPosition() throws Exception {
}

/* fourth method: capture the current master position. */
if ( initial == null ) {
if ( initial == null && !config.vitessEnabled ) {
try ( Connection c = context.getReplicationConnection() ) {
initial = Position.capture(c, config.gtidMode);
}
Expand Down Expand Up @@ -203,7 +204,8 @@ public String getMaxwellVersion() {
static String bootString = "Maxwell v%s is booting (%s), starting at %s";
private void logBanner(AbstractProducer producer, Position initialPosition) {
String producerName = producer.getClass().getSimpleName();
LOGGER.info(String.format(bootString, getMaxwellVersion(), producerName, initialPosition.toString()));
String position = initialPosition != null ? initialPosition.toString() : "the latest position";
LOGGER.info(String.format(bootString, getMaxwellVersion(), producerName, position));
}

/**
Expand Down Expand Up @@ -239,13 +241,8 @@ public void start() throws Exception {
}

private void startInner() throws Exception {
try ( Connection connection = this.context.getReplicationConnection();
Connection rawConnection = this.context.getRawMaxwellConnection() ) {
MaxwellMysqlStatus.ensureReplicationMysqlState(connection);
MaxwellMysqlStatus.ensureMaxwellMysqlState(rawConnection);
if (config.gtidMode) {
MaxwellMysqlStatus.ensureGtidMysqlState(connection);
}
try (Connection rawConnection = this.context.getRawMaxwellConnection()) {
MaxwellMysqlStatus.ensureMaxwellMysqlState(rawConnection, config.vitessEnabled);

SchemaStoreSchema.ensureMaxwellSchema(rawConnection, this.config.databaseName);

Expand All @@ -256,41 +253,65 @@ private void startInner() throws Exception {

AbstractProducer producer = this.context.getProducer();

if (!config.vitessEnabled) {
try (Connection connection = context.getReplicationConnection()) {
MaxwellMysqlStatus.ensureReplicationMysqlState(connection);

if (config.gtidMode) {
MaxwellMysqlStatus.ensureGtidMysqlState(connection);
}
}
}

Position initPosition = getInitialPosition();
logBanner(producer, initPosition);
this.context.setPosition(initPosition);

MysqlSchemaStore mysqlSchemaStore = new MysqlSchemaStore(this.context, initPosition);
BootstrapController bootstrapController = this.context.getBootstrapController(mysqlSchemaStore.getSchemaID());
if (config.vitessEnabled) {
this.replicator = new VStreamReplicator(
config.vitessConfig,
producer,
context.getPositionStore(),
initPosition,
context.getMetrics(),
context.getFilter(),
config.bufferMemoryUsage,
config.binlogEventQueueSize
);
} else {
MysqlSchemaStore mysqlSchemaStore = new MysqlSchemaStore(this.context, initPosition);
BootstrapController bootstrapController = this.context
.getBootstrapController(mysqlSchemaStore.getSchemaID());

this.context.startSchemaCompactor();

this.context.startSchemaCompactor();
if (config.recaptureSchema) {
mysqlSchemaStore.captureAndSaveSchema();
}

if (config.recaptureSchema) {
mysqlSchemaStore.captureAndSaveSchema();
}
mysqlSchemaStore.getSchema(); // trigger schema to load / capture before we start the replicator.

mysqlSchemaStore.getSchema(); // trigger schema to load / capture before we start the replicator.

this.replicator = new BinlogConnectorReplicator(
mysqlSchemaStore,
producer,
bootstrapController,
config.replicationMysql,
config.replicaServerID,
config.databaseName,
context.getMetrics(),
initPosition,
false,
config.clientID,
context.getHeartbeatNotifier(),
config.scripting,
context.getFilter(),
context.getConfig().getIgnoreMissingSchema(),
config.outputConfig,
config.bufferMemoryUsage,
config.replicationReconnectionRetries,
config.binlogEventQueueSize
);
this.replicator = new BinlogConnectorReplicator(
mysqlSchemaStore,
producer,
bootstrapController,
config.replicationMysql,
config.replicaServerID,
config.databaseName,
context.getMetrics(),
initPosition,
false,
config.clientID,
context.getHeartbeatNotifier(),
config.scripting,
context.getFilter(),
context.getConfig().getIgnoreMissingSchema(),
config.outputConfig,
config.bufferMemoryUsage,
config.replicationReconnectionRetries,
config.binlogEventQueueSize
);
}

context.setReplicator(replicator);
this.context.start();
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ public class MaxwellConfig extends AbstractConfig {
*/
public Boolean gtidMode;

/**
* If Maxwell is running against a vitess cluster.
*/
public Boolean vitessEnabled;

/**
* Vitess (vtgate) connection config
*/
public MaxwellVitessConfig vitessConfig;

/**
* Name of database in which to store maxwell data (default `maxwell`)
*/
Expand Down Expand Up @@ -641,6 +651,10 @@ public MaxwellConfig() { // argv is only null in tests
this.schemaMysql = new MaxwellMysqlConfig();
this.masterRecovery = false;
this.gtidMode = false;

this.vitessEnabled = false;
this.vitessConfig = new MaxwellVitessConfig();

this.bufferedProducerSize = 200;
this.outputConfig = new MaxwellOutputConfig();
setup(null, null); // setup defaults
Expand Down Expand Up @@ -1043,10 +1057,12 @@ private void setup(OptionSet options, Properties properties) {
this.replicationMysql = parseMysqlConfig("replication_", options, properties);
this.schemaMysql = parseMysqlConfig("schema_", options, properties);
this.gtidMode = fetchBooleanOption("gtid_mode", options, properties, System.getenv(GTID_MODE_ENV) != null);

this.databaseName = fetchStringOption("schema_database", options, properties, "maxwell");
this.maxwellMysql.database = this.databaseName;

this.vitessEnabled = fetchBooleanOption("vitess", options, properties, false);
this.vitessConfig = parseVitessConfig(options, properties);

this.producerFactory = fetchProducerFactory(options, properties);
this.producerType = fetchStringOption("producer", options, properties, "stdout");
this.producerAckTimeout = fetchLongOption("producer_ack_timeout", options, properties, 0L);
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.zendesk.maxwell.schema.MysqlSchemaCompactor;
import com.zendesk.maxwell.schema.PositionStoreThread;
import com.zendesk.maxwell.schema.ReadOnlyMysqlPositionStore;
import com.zendesk.maxwell.schema.VitessPositionStore;
import com.zendesk.maxwell.util.C3P0ConnectionPool;
import com.zendesk.maxwell.util.RunLoopProcess;
import com.zendesk.maxwell.util.StoppableTask;
Expand Down Expand Up @@ -127,10 +128,13 @@ public MaxwellContext(MaxwellConfig config) throws SQLException, URISyntaxExcept
if ( this.config.initPosition != null )
this.initialPosition = this.config.initPosition;

if ( this.config.replayMode ) {
this.positionStore = new ReadOnlyMysqlPositionStore(this.getMaxwellConnectionPool(), this.getServerID(), this.config.clientID, config.gtidMode);
ConnectionPool cp = getMaxwellConnectionPool();
if (this.config.replayMode) {
this.positionStore = new ReadOnlyMysqlPositionStore(cp, getServerID(), config.clientID, config.gtidMode);
} else if (config.vitessEnabled) {
this.positionStore = new VitessPositionStore(cp, config.clientID);
} else {
this.positionStore = new MysqlPositionStore(this.getMaxwellConnectionPool(), this.getServerID(), this.config.clientID, config.gtidMode);
this.positionStore = new MysqlPositionStore(cp, getServerID(), config.clientID, config.gtidMode);
}

this.heartbeatNotifier = new HeartbeatNotifier();
Expand Down Expand Up @@ -255,7 +259,7 @@ public Thread terminate(Exception error) {
}

if (taskManager.requestStop()) {
if (this.error == null && this.replicator != null) {
if (this.error == null && this.replicator != null && !config.vitessEnabled) {
sendFinalHeartbeat();
}
this.terminationThread = spawnTerminateThread();
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/zendesk/maxwell/MaxwellMysqlStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,18 @@ public static void ensureReplicationMysqlState(Connection c) throws SQLException
/**
* Verify that the maxwell database is in the expected state
* @param c a JDBC connection
* @param vitessEnabled whether or not we are running in a vitess environment
* @throws SQLException if we have database issues
* @throws MaxwellCompatibilityError if we're not in the expected state
*/
public static void ensureMaxwellMysqlState(Connection c) throws SQLException, MaxwellCompatibilityError {
public static void ensureMaxwellMysqlState(Connection c, boolean vitessEnabled) throws SQLException, MaxwellCompatibilityError {
MaxwellMysqlStatus m = new MaxwellMysqlStatus(c);

m.ensureVariableState("read_only", "OFF");
// Vitess reports read_only=ON while running in a vttestserver and sometimes even
// in production. We need to ignore this setting when running against Vitess.
if (!vitessEnabled) {
m.ensureVariableState("read_only", "OFF");
}
}

/**
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellVitessConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.zendesk.maxwell;

public class MaxwellVitessConfig {
public String vtgateHost;
public int vtgatePort;

public String user;
public String password;

public String keyspace;
public String shard;

public boolean usePlaintext;
public String tlsCA;
public String tlsCert;
public String tlsKey;
public String tlsServerName;

public MaxwellVitessConfig() {
this.vtgateHost = "localhost";
this.vtgatePort = 15991;
this.usePlaintext = true;

this.tlsCA = null;
this.tlsCert = null;
this.tlsKey = null;
this.tlsServerName = null;

this.user = null;
this.password = null;

this.keyspace = null;
this.shard = "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ protected void processRow(RowMap row) throws Exception {
if ( row instanceof HeartbeatRowMap) {
producer.push(row);
if (stopAtHeartbeat != null) {
long thisHeartbeat = row.getPosition().getLastHeartbeatRead();
Position p = (Position) row.getPosition();
long thisHeartbeat = p.getLastHeartbeatRead();
if (thisHeartbeat >= stopAtHeartbeat) {
LOGGER.info("received final heartbeat " + thisHeartbeat + "; stopping replicator");
// terminate runLoop
Expand Down
75 changes: 75 additions & 0 deletions src/main/java/com/zendesk/maxwell/replication/VStreamObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.zendesk.maxwell.replication;

import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import binlogdata.Binlogdata.VEvent;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.vitess.proto.Vtgate;

///
// Observes the VStream response stream, extracts events from VStream responses,
// then passes them to the VStreamReplicator via a queue for processing.
//
public class VStreamObserver implements StreamObserver<Vtgate.VStreamResponse> {
private static final Logger LOGGER = LoggerFactory.getLogger(VStreamObserver.class);
private final AtomicBoolean mustStop = new AtomicBoolean(false);
private final LinkedBlockingDeque<VEvent> queue;
private Exception lastException = null;

public VStreamObserver(LinkedBlockingDeque<VEvent> queue) {
this.queue = queue;
}

// Shuts down the observer
public void stop() {
mustStop.set(true);
}

@Override
public void onNext(Vtgate.VStreamResponse response) {
LOGGER.debug("Received {} VEvents in the VStreamResponse:", response.getEventsCount());

List<VEvent> messageEvents = response.getEventsList();
for (VEvent event : messageEvents) {
LOGGER.trace("VEvent: {}", event);
enqueueEvent(event);
}
}

@Override
public void onError(Throwable t) {
this.lastException = Status.fromThrowable(t).asException();
LOGGER.error("VStream streaming onError. Status: {}", lastException);
stop();
}

@Override
public void onCompleted() {
LOGGER.info("VStream streaming completed.");
stop();
}

public Exception getLastException() {
return lastException;
}

// Pushes an event to the queue for VStreamReplicator to process.
private void enqueueEvent(VEvent event) {
while (mustStop.get() != true) {
try {
if (queue.offer(event, 100, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException e) {
return;
}
}
}
}
Loading