diff --git a/pom.xml b/pom.xml
index 6d7959b0e..36d2e9c73 100644
--- a/pom.xml
+++ b/pom.xml
@@ -178,6 +178,12 @@
15.3
+
+ io.vitess
+ vitess-grpc-client
+ 14.0.1
+
+
org.apache.logging.log4j
@@ -291,7 +297,6 @@
jedis
3.5.1
-
io.dropwizard.metrics
diff --git a/src/main/java/com/zendesk/maxwell/Maxwell.java b/src/main/java/com/zendesk/maxwell/Maxwell.java
index 7cc17a2f8..5d0c904c7 100644
--- a/src/main/java/com/zendesk/maxwell/Maxwell.java
+++ b/src/main/java/com/zendesk/maxwell/Maxwell.java
@@ -9,6 +9,7 @@
import com.zendesk.maxwell.replication.BinlogConnectorReplicator;
import com.zendesk.maxwell.replication.Position;
import com.zendesk.maxwell.replication.Replicator;
+import com.zendesk.maxwell.replication.VStreamReplicator;
import com.zendesk.maxwell.row.HeartbeatRowMap;
import com.zendesk.maxwell.schema.*;
import com.zendesk.maxwell.schema.columndef.ColumnDefCastException;
@@ -174,7 +175,7 @@ protected Position getInitialPosition() throws Exception {
}
/* fourth method: capture the current master position. */
- if ( initial == null ) {
+ if ( initial == null && !config.vitessEnabled ) {
try ( Connection c = context.getReplicationConnection() ) {
initial = Position.capture(c, config.gtidMode);
}
@@ -203,7 +204,8 @@ public String getMaxwellVersion() {
static String bootString = "Maxwell v%s is booting (%s), starting at %s";
private void logBanner(AbstractProducer producer, Position initialPosition) {
String producerName = producer.getClass().getSimpleName();
- LOGGER.info(String.format(bootString, getMaxwellVersion(), producerName, initialPosition.toString()));
+ String position = initialPosition != null ? initialPosition.toString() : "the latest position";
+ LOGGER.info(String.format(bootString, getMaxwellVersion(), producerName, position));
}
/**
@@ -239,13 +241,8 @@ public void start() throws Exception {
}
private void startInner() throws Exception {
- try ( Connection connection = this.context.getReplicationConnection();
- Connection rawConnection = this.context.getRawMaxwellConnection() ) {
- MaxwellMysqlStatus.ensureReplicationMysqlState(connection);
- MaxwellMysqlStatus.ensureMaxwellMysqlState(rawConnection);
- if (config.gtidMode) {
- MaxwellMysqlStatus.ensureGtidMysqlState(connection);
- }
+ try (Connection rawConnection = this.context.getRawMaxwellConnection()) {
+ MaxwellMysqlStatus.ensureMaxwellMysqlState(rawConnection, config.vitessEnabled);
SchemaStoreSchema.ensureMaxwellSchema(rawConnection, this.config.databaseName);
@@ -256,41 +253,65 @@ private void startInner() throws Exception {
AbstractProducer producer = this.context.getProducer();
+ if (!config.vitessEnabled) {
+ try (Connection connection = context.getReplicationConnection()) {
+ MaxwellMysqlStatus.ensureReplicationMysqlState(connection);
+
+ if (config.gtidMode) {
+ MaxwellMysqlStatus.ensureGtidMysqlState(connection);
+ }
+ }
+ }
+
Position initPosition = getInitialPosition();
logBanner(producer, initPosition);
this.context.setPosition(initPosition);
- MysqlSchemaStore mysqlSchemaStore = new MysqlSchemaStore(this.context, initPosition);
- BootstrapController bootstrapController = this.context.getBootstrapController(mysqlSchemaStore.getSchemaID());
+ if (config.vitessEnabled) {
+ this.replicator = new VStreamReplicator(
+ config.vitessConfig,
+ producer,
+ context.getPositionStore(),
+ initPosition,
+ context.getMetrics(),
+ context.getFilter(),
+ config.bufferMemoryUsage,
+ config.binlogEventQueueSize
+ );
+ } else {
+ MysqlSchemaStore mysqlSchemaStore = new MysqlSchemaStore(this.context, initPosition);
+ BootstrapController bootstrapController = this.context
+ .getBootstrapController(mysqlSchemaStore.getSchemaID());
+
+ this.context.startSchemaCompactor();
- this.context.startSchemaCompactor();
+ if (config.recaptureSchema) {
+ mysqlSchemaStore.captureAndSaveSchema();
+ }
- if (config.recaptureSchema) {
- mysqlSchemaStore.captureAndSaveSchema();
- }
+ mysqlSchemaStore.getSchema(); // trigger schema to load / capture before we start the replicator.
- mysqlSchemaStore.getSchema(); // trigger schema to load / capture before we start the replicator.
-
- this.replicator = new BinlogConnectorReplicator(
- mysqlSchemaStore,
- producer,
- bootstrapController,
- config.replicationMysql,
- config.replicaServerID,
- config.databaseName,
- context.getMetrics(),
- initPosition,
- false,
- config.clientID,
- context.getHeartbeatNotifier(),
- config.scripting,
- context.getFilter(),
- context.getConfig().getIgnoreMissingSchema(),
- config.outputConfig,
- config.bufferMemoryUsage,
- config.replicationReconnectionRetries,
- config.binlogEventQueueSize
- );
+ this.replicator = new BinlogConnectorReplicator(
+ mysqlSchemaStore,
+ producer,
+ bootstrapController,
+ config.replicationMysql,
+ config.replicaServerID,
+ config.databaseName,
+ context.getMetrics(),
+ initPosition,
+ false,
+ config.clientID,
+ context.getHeartbeatNotifier(),
+ config.scripting,
+ context.getFilter(),
+ context.getConfig().getIgnoreMissingSchema(),
+ config.outputConfig,
+ config.bufferMemoryUsage,
+ config.replicationReconnectionRetries,
+ config.binlogEventQueueSize
+ );
+ }
context.setReplicator(replicator);
this.context.start();
diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java
index 1cac4be78..5366a4a67 100644
--- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java
+++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java
@@ -81,6 +81,16 @@ public class MaxwellConfig extends AbstractConfig {
*/
public Boolean gtidMode;
+ /**
+ * If Maxwell is running against a vitess cluster.
+ */
+ public Boolean vitessEnabled;
+
+ /**
+ * Vitess (vtgate) connection config
+ */
+ public MaxwellVitessConfig vitessConfig;
+
/**
* Name of database in which to store maxwell data (default `maxwell`)
*/
@@ -641,6 +651,10 @@ public MaxwellConfig() { // argv is only null in tests
this.schemaMysql = new MaxwellMysqlConfig();
this.masterRecovery = false;
this.gtidMode = false;
+
+ this.vitessEnabled = false;
+ this.vitessConfig = new MaxwellVitessConfig();
+
this.bufferedProducerSize = 200;
this.outputConfig = new MaxwellOutputConfig();
setup(null, null); // setup defaults
@@ -1043,10 +1057,12 @@ private void setup(OptionSet options, Properties properties) {
this.replicationMysql = parseMysqlConfig("replication_", options, properties);
this.schemaMysql = parseMysqlConfig("schema_", options, properties);
this.gtidMode = fetchBooleanOption("gtid_mode", options, properties, System.getenv(GTID_MODE_ENV) != null);
-
this.databaseName = fetchStringOption("schema_database", options, properties, "maxwell");
this.maxwellMysql.database = this.databaseName;
+ this.vitessEnabled = fetchBooleanOption("vitess", options, properties, false);
+ this.vitessConfig = parseVitessConfig(options, properties);
+
this.producerFactory = fetchProducerFactory(options, properties);
this.producerType = fetchStringOption("producer", options, properties, "stdout");
this.producerAckTimeout = fetchLongOption("producer_ack_timeout", options, properties, 0L);
diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java
index 1da30273f..8a493470b 100644
--- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java
+++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java
@@ -14,6 +14,7 @@
import com.zendesk.maxwell.schema.MysqlSchemaCompactor;
import com.zendesk.maxwell.schema.PositionStoreThread;
import com.zendesk.maxwell.schema.ReadOnlyMysqlPositionStore;
+import com.zendesk.maxwell.schema.VitessPositionStore;
import com.zendesk.maxwell.util.C3P0ConnectionPool;
import com.zendesk.maxwell.util.RunLoopProcess;
import com.zendesk.maxwell.util.StoppableTask;
@@ -127,10 +128,13 @@ public MaxwellContext(MaxwellConfig config) throws SQLException, URISyntaxExcept
if ( this.config.initPosition != null )
this.initialPosition = this.config.initPosition;
- if ( this.config.replayMode ) {
- this.positionStore = new ReadOnlyMysqlPositionStore(this.getMaxwellConnectionPool(), this.getServerID(), this.config.clientID, config.gtidMode);
+ ConnectionPool cp = getMaxwellConnectionPool();
+ if (this.config.replayMode) {
+ this.positionStore = new ReadOnlyMysqlPositionStore(cp, getServerID(), config.clientID, config.gtidMode);
+ } else if (config.vitessEnabled) {
+ this.positionStore = new VitessPositionStore(cp, config.clientID);
} else {
- this.positionStore = new MysqlPositionStore(this.getMaxwellConnectionPool(), this.getServerID(), this.config.clientID, config.gtidMode);
+ this.positionStore = new MysqlPositionStore(cp, getServerID(), config.clientID, config.gtidMode);
}
this.heartbeatNotifier = new HeartbeatNotifier();
@@ -255,7 +259,7 @@ public Thread terminate(Exception error) {
}
if (taskManager.requestStop()) {
- if (this.error == null && this.replicator != null) {
+ if (this.error == null && this.replicator != null && !config.vitessEnabled) {
sendFinalHeartbeat();
}
this.terminationThread = spawnTerminateThread();
diff --git a/src/main/java/com/zendesk/maxwell/MaxwellMysqlStatus.java b/src/main/java/com/zendesk/maxwell/MaxwellMysqlStatus.java
index 486cf23c8..ab04e6cd8 100644
--- a/src/main/java/com/zendesk/maxwell/MaxwellMysqlStatus.java
+++ b/src/main/java/com/zendesk/maxwell/MaxwellMysqlStatus.java
@@ -109,13 +109,18 @@ public static void ensureReplicationMysqlState(Connection c) throws SQLException
/**
* Verify that the maxwell database is in the expected state
* @param c a JDBC connection
+ * @param vitessEnabled whether or not we are running in a vitess environment
* @throws SQLException if we have database issues
* @throws MaxwellCompatibilityError if we're not in the expected state
*/
- public static void ensureMaxwellMysqlState(Connection c) throws SQLException, MaxwellCompatibilityError {
+ public static void ensureMaxwellMysqlState(Connection c, boolean vitessEnabled) throws SQLException, MaxwellCompatibilityError {
MaxwellMysqlStatus m = new MaxwellMysqlStatus(c);
- m.ensureVariableState("read_only", "OFF");
+ // Vitess reports read_only=ON while running in a vttestserver and sometimes even
+ // in production. We need to ignore this setting when running against Vitess.
+ if (!vitessEnabled) {
+ m.ensureVariableState("read_only", "OFF");
+ }
}
/**
diff --git a/src/main/java/com/zendesk/maxwell/MaxwellVitessConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellVitessConfig.java
new file mode 100644
index 000000000..fcb93a6f7
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/MaxwellVitessConfig.java
@@ -0,0 +1,35 @@
+package com.zendesk.maxwell;
+
+public class MaxwellVitessConfig {
+ public String vtgateHost;
+ public int vtgatePort;
+
+ public String user;
+ public String password;
+
+ public String keyspace;
+ public String shard;
+
+ public boolean usePlaintext;
+ public String tlsCA;
+ public String tlsCert;
+ public String tlsKey;
+ public String tlsServerName;
+
+ public MaxwellVitessConfig() {
+ this.vtgateHost = "localhost";
+ this.vtgatePort = 15991;
+ this.usePlaintext = true;
+
+ this.tlsCA = null;
+ this.tlsCert = null;
+ this.tlsKey = null;
+ this.tlsServerName = null;
+
+ this.user = null;
+ this.password = null;
+
+ this.keyspace = null;
+ this.shard = "";
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java
index 9eb5a62fc..d19f35d81 100644
--- a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java
+++ b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java
@@ -338,7 +338,8 @@ protected void processRow(RowMap row) throws Exception {
if ( row instanceof HeartbeatRowMap) {
producer.push(row);
if (stopAtHeartbeat != null) {
- long thisHeartbeat = row.getPosition().getLastHeartbeatRead();
+ Position p = (Position) row.getPosition();
+ long thisHeartbeat = p.getLastHeartbeatRead();
if (thisHeartbeat >= stopAtHeartbeat) {
LOGGER.info("received final heartbeat " + thisHeartbeat + "; stopping replicator");
// terminate runLoop
diff --git a/src/main/java/com/zendesk/maxwell/replication/VStreamObserver.java b/src/main/java/com/zendesk/maxwell/replication/VStreamObserver.java
new file mode 100644
index 000000000..96fa5412c
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/VStreamObserver.java
@@ -0,0 +1,75 @@
+package com.zendesk.maxwell.replication;
+
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import binlogdata.Binlogdata.VEvent;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import io.vitess.proto.Vtgate;
+
+///
+// Observes the VStream response stream, extracts events from VStream responses,
+// then passes them to the VStreamReplicator via a queue for processing.
+//
+public class VStreamObserver implements StreamObserver {
+ private static final Logger LOGGER = LoggerFactory.getLogger(VStreamObserver.class);
+ private final AtomicBoolean mustStop = new AtomicBoolean(false);
+ private final LinkedBlockingDeque queue;
+ private Exception lastException = null;
+
+ public VStreamObserver(LinkedBlockingDeque queue) {
+ this.queue = queue;
+ }
+
+ // Shuts down the observer
+ public void stop() {
+ mustStop.set(true);
+ }
+
+ @Override
+ public void onNext(Vtgate.VStreamResponse response) {
+ LOGGER.debug("Received {} VEvents in the VStreamResponse:", response.getEventsCount());
+
+ List messageEvents = response.getEventsList();
+ for (VEvent event : messageEvents) {
+ LOGGER.trace("VEvent: {}", event);
+ enqueueEvent(event);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ this.lastException = Status.fromThrowable(t).asException();
+ LOGGER.error("VStream streaming onError. Status: {}", lastException);
+ stop();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("VStream streaming completed.");
+ stop();
+ }
+
+ public Exception getLastException() {
+ return lastException;
+ }
+
+ // Pushes an event to the queue for VStreamReplicator to process.
+ private void enqueueEvent(VEvent event) {
+ while (mustStop.get() != true) {
+ try {
+ if (queue.offer(event, 100, TimeUnit.MILLISECONDS)) {
+ break;
+ }
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/VStreamReplicator.java b/src/main/java/com/zendesk/maxwell/replication/VStreamReplicator.java
new file mode 100644
index 000000000..36d723682
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/VStreamReplicator.java
@@ -0,0 +1,510 @@
+package com.zendesk.maxwell.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.zendesk.maxwell.MaxwellVitessConfig;
+import com.zendesk.maxwell.errors.DuplicateProcessException;
+import com.zendesk.maxwell.filtering.Filter;
+import com.zendesk.maxwell.monitoring.Metrics;
+import com.zendesk.maxwell.producer.AbstractProducer;
+import com.zendesk.maxwell.replication.vitess.ReplicationMessageColumn;
+import com.zendesk.maxwell.replication.vitess.Vgtid;
+import com.zendesk.maxwell.replication.vitess.VitessSchema;
+import com.zendesk.maxwell.replication.vitess.VitessTable;
+import com.zendesk.maxwell.row.RowMap;
+import com.zendesk.maxwell.row.RowMapBuffer;
+import com.zendesk.maxwell.schema.MysqlPositionStore;
+import com.zendesk.maxwell.schema.Schema;
+import com.zendesk.maxwell.schema.SchemaStoreException;
+import com.zendesk.maxwell.schema.VitessPositionStore;
+import com.zendesk.maxwell.util.RunLoopProcess;
+
+import binlogdata.Binlogdata.FieldEvent;
+import binlogdata.Binlogdata.RowChange;
+import binlogdata.Binlogdata.RowEvent;
+import binlogdata.Binlogdata.VEvent;
+import binlogdata.Binlogdata.VEventType;
+import binlogdata.Binlogdata.VGtid;
+import io.grpc.ChannelCredentials;
+import io.grpc.Grpc;
+import io.grpc.InsecureChannelCredentials;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.TlsChannelCredentials;
+import io.vitess.proto.Vtgate.VStreamFlags;
+import io.vitess.proto.Vtgate.VStreamRequest;
+import io.vitess.proto.grpc.VitessGrpc;
+import io.vitess.client.grpc.StaticAuthCredentials;
+import io.vitess.proto.Topodata;
+import io.vitess.proto.Query.Row;
+
+public class VStreamReplicator extends RunLoopProcess implements Replicator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(VStreamReplicator.class);
+
+ private static final int GRPC_MAX_INBOUND_MESSAGE_SIZE = 4 * 1024 * 1024;
+ public static final int KEEPALIVE_INTERVAL_SECONDS = 60;
+ public static final int HEARTBEAT_INTERVAL_SECONDS = 30;
+
+ private static final long MAX_TX_ELEMENTS = 10000;
+
+ private static final String INSERT_TYPE = "INSERT";
+ private static final String UPDATE_TYPE = "UPDATE";
+ private static final String DELETE_TYPE = "DELETE";
+
+ private static final String MAXWELL_USER_AGENT = "Maxwell's Daemon";
+
+ private final MaxwellVitessConfig vitessConfig;
+ private final AbstractProducer producer;
+ private final VitessPosition initPosition;
+ private final VitessPositionStore positionStore;
+ private RowMapBuffer rowBuffer;
+ private final float bufferMemoryUsage;
+
+ private ManagedChannel channel;
+ private VStreamObserver responseObserver;
+ private boolean replicatorStarted = false;
+
+ private final LinkedBlockingDeque queue;
+ private final VitessSchema vitessSchema = new VitessSchema();
+ private final Filter filter;
+
+ private final Counter rowCounter;
+ private final Meter rowMeter;
+ private final Histogram transactionRowCount;
+ private final Histogram transactionExecutionTime;
+
+ public VStreamReplicator(
+ MaxwellVitessConfig vitessConfig,
+ AbstractProducer producer,
+ MysqlPositionStore positionStore,
+ Position initPosition,
+ Metrics metrics,
+ Filter filter,
+ Float bufferMemoryUsage,
+ int binlogEventQueueSize) {
+ this.vitessConfig = vitessConfig;
+ this.producer = producer;
+ this.initPosition = (VitessPosition) initPosition;
+ this.positionStore = (VitessPositionStore) positionStore;
+ this.queue = new LinkedBlockingDeque<>(binlogEventQueueSize);
+ this.filter = filter;
+ this.bufferMemoryUsage = bufferMemoryUsage;
+
+ /* setup metrics */
+ MetricRegistry mr = metrics.getRegistry();
+ rowCounter = mr.counter(metrics.metricName("row", "count"));
+ rowMeter = mr.meter(metrics.metricName("row", "meter"));
+ transactionRowCount = mr.histogram(metrics.metricName("transaction", "row_count"));
+ transactionExecutionTime = mr.histogram(metrics.metricName("transaction", "execution_time"));
+ }
+
+ public void startReplicator() throws Exception {
+ LOGGER.info("Starting VStreamReplicator, connecting to Vtgate at {}:{}",
+ vitessConfig.vtgateHost, vitessConfig.vtgatePort);
+
+ this.channel = newChannel(vitessConfig, GRPC_MAX_INBOUND_MESSAGE_SIZE);
+
+ VitessGrpc.VitessStub stub = VitessGrpc.newStub(channel);
+ if (vitessConfig.user != null && vitessConfig.password != null) {
+ LOGGER.info("Using provided credentials for Vtgate grpc calls");
+ stub = stub.withCallCredentials(
+ new StaticAuthCredentials(vitessConfig.user, vitessConfig.password)
+ );
+ }
+
+ VStreamFlags vStreamFlags = VStreamFlags.newBuilder()
+ .setStopOnReshard(true)
+ .setHeartbeatInterval(HEARTBEAT_INTERVAL_SECONDS)
+ .build();
+
+ VStreamRequest vstreamRequest = VStreamRequest.newBuilder()
+ .setVgtid(initialVgtid())
+ .setTabletType(Topodata.TabletType.MASTER)
+ .setFlags(vStreamFlags)
+ .build();
+
+ this.responseObserver = new VStreamObserver(queue);
+ stub.vStream(vstreamRequest, responseObserver);
+
+ this.replicatorStarted = true;
+ LOGGER.info("Started VStream");
+ }
+
+ @Override
+ protected void beforeStop() throws Exception {
+ responseObserver.stop();
+ responseObserver.onCompleted();
+
+ channel.shutdown();
+ channel.awaitTermination(500, TimeUnit.MILLISECONDS);
+ channel.shutdownNow();
+ }
+
+ /**
+ * get a single row from the replicator and pass it to the producer or
+ * bootstrapper.
+ *
+ * This is the top-level function in the run-loop.
+ */
+ @Override
+ public void work() throws Exception {
+ RowMap row = null;
+ try {
+ row = getRow();
+ } catch (InterruptedException e) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Interrupted while waiting for row");
+ }
+ }
+
+ if (row == null) {
+ checkIfVstreamIsAlive();
+ return;
+ }
+
+ rowCounter.inc();
+ rowMeter.mark();
+
+ // if ( scripting != null && !isMaxwellRow(row))
+ // scripting.invoke(row);
+
+ processRow(row);
+ }
+
+ private void checkIfVstreamIsAlive() {
+ if (!replicatorStarted) return;
+
+ Exception lastException = responseObserver.getLastException();
+ if (lastException != null) {
+ LOGGER.error("VStream is dead, stopping...");
+ throw new RuntimeException(lastException);
+ }
+ }
+
+ public RowMap getRow() throws Exception {
+ if (!replicatorStarted) {
+ LOGGER.warn("replicator was not started, calling startReplicator()...");
+ startReplicator();
+ }
+
+ while (true) {
+ if (rowBuffer != null && !rowBuffer.isEmpty()) {
+ return rowBuffer.removeFirst();
+ }
+
+ VEvent event = pollEvent();
+ if (event == null) {
+ return null;
+ }
+
+ if (event.getType() == VEventType.BEGIN) {
+ rowBuffer = getTransactionRows(event);
+ } else {
+ processServiceEvent(event);
+ }
+ }
+ }
+
+ private VGtid initialVgtid() {
+ Vgtid initialVgtid;
+ if (initPosition != null) {
+ initialVgtid = initPosition.getVgtid();
+ } else {
+ Vgtid.ShardGtid shardGtid = new Vgtid.ShardGtid(vitessConfig.keyspace, vitessConfig.shard, "current");
+ initialVgtid = Vgtid.of(List.of(shardGtid));
+ }
+
+ LOGGER.debug("Setting the initial vgtid for the stream to {}", initialVgtid);
+ return initialVgtid.getRawVgtid();
+ }
+
+ private void processRow(RowMap row) throws Exception {
+ producer.push(row);
+ }
+
+ private VEvent pollEvent() throws InterruptedException {
+ return queue.poll(100, TimeUnit.MILLISECONDS);
+ }
+
+ private void processFieldEvent(VEvent event) {
+ FieldEvent fieldEvent = event.getFieldEvent();
+ LOGGER.debug("Received field event: {}", fieldEvent);
+ vitessSchema.processFieldEvent(fieldEvent);
+ }
+
+ private VitessPosition processVGtidEvent(VEvent event) {
+ LOGGER.debug("Received GTID event: {}", event);
+ return new VitessPosition(Vgtid.of(event.getVgtid()));
+ }
+
+ /**
+ * Get a batch of rows for the current transaction.
+ *
+ * We assume the replicator has just processed a "BEGIN" event, and now
+ * we're inside a transaction. We'll process all rows inside that transaction
+ * and turn them into RowMap objects.
+ *
+ * We do this because we want to attach all rows within
+ * the transaction the same transaction id value (xid, which we generate
+ * ourselves since VStream
+ * does not expose underlying transaction ids to the consumer).
+ *
+ * @return A RowMapBuffer of rows; either in-memory or on disk.
+ */
+ private RowMapBuffer getTransactionRows(VEvent beginEvent) throws Exception {
+ RowMapBuffer buffer = new RowMapBuffer(MAX_TX_ELEMENTS, this.bufferMemoryUsage);
+
+ // Since transactions in VStream do not have an XID value, we generate one
+ long xid = System.currentTimeMillis() * 1000 + Math.abs(beginEvent.hashCode()) % 1000;
+ LOGGER.trace("Generated transaction id: {}", xid);
+ buffer.setXid(xid);
+
+ // Since specific VStream events do not provide the VGTID, we capture it from
+ // VGTID events present in each transaction.
+ VitessPosition latestPosition = null;
+
+ while (true) {
+ final VEvent event = pollEvent();
+ if (event == null) {
+ continue;
+ }
+
+ final VEventType eventType = event.getType();
+
+ if (eventType == VEventType.VGTID) {
+ latestPosition = processVGtidEvent(event);
+ continue;
+ }
+
+ if (eventType == VEventType.COMMIT) {
+ LOGGER.trace("Received COMMIT event");
+ if (!buffer.isEmpty()) {
+ // Set TX flag and the position on the last row in the transaction
+ RowMap lastEvent = buffer.getLast();
+ if (latestPosition != null) {
+ lastEvent.setNextPosition(latestPosition);
+ } else {
+ throw new RuntimeException("VGTID is null for transaction");
+ }
+ lastEvent.setTXCommit();
+
+ long timeSpent = buffer.getLast().getTimestampMillis() - beginEvent.getTimestamp();
+ transactionExecutionTime.update(timeSpent);
+ transactionRowCount.update(buffer.size());
+ }
+ return buffer;
+ }
+
+ if (eventType == VEventType.ROW) {
+ List eventRows = rowEventToMaps(event, xid);
+ for (RowMap r : eventRows) {
+ // if (shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter)) {
+ buffer.add(r);
+ }
+ continue;
+ }
+
+ // All other events are service events delivering the schema, GTID values, etc.
+ processServiceEvent(event);
+ }
+ }
+
+ private void processServiceEvent(VEvent event) throws SQLException, DuplicateProcessException {
+ final VEventType eventType = event.getType();
+ switch (eventType) {
+ case FIELD:
+ processFieldEvent(event);
+ break;
+
+ case HEARTBEAT:
+ LOGGER.debug("Received heartbeat from vtgate: {}", event);
+ break;
+
+ case VGTID:
+ // Use an initial VGTID event received after connecting to vtgate as for setting
+ // the initial position of the stream.
+ if (initPosition == null) {
+ VitessPosition position = processVGtidEvent(event);
+ LOGGER.info("Current VGTID event received, using it for initial positioning at {}", position);
+ positionStore.set(position);
+ } else {
+ LOGGER.warn("Ignoring a standalone VGTID event, we already have an initial position: {}", event);
+ }
+ break;
+
+ case ROW:
+ case BEGIN:
+ case COMMIT:
+ LOGGER.error("Unexpected event outside of a transaction, skipping: {}", event);
+ break;
+
+ default:
+ LOGGER.debug("Unsupported service event: {}", event);
+ break;
+ }
+ }
+
+ private List rowEventToMaps(VEvent event, long xid) {
+ Long timestampMillis = event.getCurrentTime();
+ RowEvent rowEvent = event.getRowEvent();
+ String qualifiedTableName = rowEvent.getTableName();
+
+ List rowMaps = new ArrayList<>(rowEvent.getRowChangesCount());
+ for (RowChange rowChange : rowEvent.getRowChangesList()) {
+ String changeType = rowChangeToMaxwellType(rowChange);
+ final VitessTable table = resolveTable(qualifiedTableName);
+ if (!filter.includes(table.getSchemaName(), table.getTableName())) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Filtering out event for table {}.{}",
+ table.getSchemaName(), table.getTableName());
+ }
+ continue;
+ }
+
+ RowMap rowMap = new RowMap(
+ changeType,
+ table.getSchemaName(),
+ table.getTableName(),
+ timestampMillis,
+ table.getPkColumns(),
+ null,
+ null,
+ null
+ );
+
+ rowMap.setXid(xid);
+
+ // Copy column values to the row map, use the new values when available, otherwise use the old ones
+
+ Row row = rowChange.hasAfter() ? rowChange.getAfter() : rowChange.getBefore();
+ List afterColumns = table.resolveColumnsFromRow(row);
+ for (ReplicationMessageColumn column : afterColumns) {
+ rowMap.putData(column.getName(), column.getValue());
+ }
+
+ // Copy old values to the row map for cases when we have both the old and the new values
+ if (changeType.equals(UPDATE_TYPE)) {
+ Row beforeRow = rowChange.getBefore();
+ List beforeColumns = table.resolveColumnsFromRow(beforeRow);
+ for (ReplicationMessageColumn column : beforeColumns) {
+ rowMap.putOldData(column.getName(), column.getValue());
+ }
+ }
+
+ rowMaps.add(rowMap);
+ }
+
+ return rowMaps;
+ }
+
+ private String rowChangeToMaxwellType(RowChange change) {
+ if (change.hasAfter() && !change.hasBefore()) {
+ return INSERT_TYPE;
+ }
+ if (change.hasAfter() && change.hasBefore()) {
+ return UPDATE_TYPE;
+ }
+ if (!change.hasAfter() && change.hasBefore()) {
+ return DELETE_TYPE;
+ }
+
+ throw new RuntimeException("Invalid row change: " + change);
+ }
+
+ private VitessTable resolveTable(String qualifiedTableName) {
+ VitessTable table = vitessSchema.getTable(qualifiedTableName);
+ if (table == null) {
+ throw new RuntimeException("Table not found in the schema: " + qualifiedTableName);
+ }
+ return table;
+ }
+
+ @Override
+ public Long getLastHeartbeatRead() {
+ throw new RuntimeException("Heartbeat support is not available in Vitess replicator");
+ }
+
+ @Override
+ public void stopAtHeartbeat(long heartbeat) {
+ throw new RuntimeException("Heartbeat support is not available in Vitess replicator");
+ }
+
+ @Override
+ public Schema getSchema() throws SchemaStoreException {
+ return null;
+ }
+
+ @Override
+ public Long getSchemaId() throws SchemaStoreException {
+ return null;
+ }
+
+
+ private static ManagedChannel newChannel(MaxwellVitessConfig config, int maxInboundMessageSize) throws IOException {
+ ChannelCredentials channelCredentials = InsecureChannelCredentials.create();
+
+ if (!config.usePlaintext) {
+ TlsChannelCredentials.Builder tlsCredentialsBuilder = TlsChannelCredentials.newBuilder();
+
+ if (config.tlsCA != null) {
+ LOGGER.info("Using a custom TLS CA: {}", config.tlsCA);
+ tlsCredentialsBuilder.trustManager(new File(config.tlsCA));
+ }
+
+ if (config.tlsCert != null && config.tlsKey != null) {
+ LOGGER.info("TLS client credentials: cert={}, key={}", config.tlsCert, config.tlsKey);
+ ensurePkcs8(config.tlsKey);
+ tlsCredentialsBuilder.keyManager(new File(config.tlsCert), new File(config.tlsKey));
+ }
+
+ channelCredentials = tlsCredentialsBuilder.build();
+ }
+
+ ManagedChannelBuilder> builder = Grpc.newChannelBuilderForAddress(config.vtgateHost, config.vtgatePort, channelCredentials)
+ .maxInboundMessageSize(maxInboundMessageSize)
+ .keepAliveTime(KEEPALIVE_INTERVAL_SECONDS, TimeUnit.SECONDS)
+ .userAgent(MAXWELL_USER_AGENT);
+
+ if (config.usePlaintext) {
+ LOGGER.warn("Using plaintext connection to vtgate");
+ }
+
+ if (config.tlsServerName != null) {
+ LOGGER.info("Using TLS server name override: {}", config.tlsServerName);
+ builder.overrideAuthority(config.tlsServerName);
+ }
+
+ return builder.build();
+ }
+
+ // Makes sure the given private key file is in PKCS#8 format.
+ private static void ensurePkcs8(String keyFile) {
+ try {
+ Path path = Paths.get(keyFile);
+ String keyContent = new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
+ if (!keyContent.contains("BEGIN PRIVATE KEY")) {
+ LOGGER.error("Private key file {} is not in PKCS#8 format, please convert it", keyFile);
+ throw new RuntimeException("Private key file is not in PKCS#8 format");
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read private key file: " + keyFile, e);
+ }
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/VitessPosition.java b/src/main/java/com/zendesk/maxwell/replication/VitessPosition.java
new file mode 100644
index 000000000..4e77acd5d
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/VitessPosition.java
@@ -0,0 +1,50 @@
+package com.zendesk.maxwell.replication;
+
+import com.zendesk.maxwell.replication.vitess.Vgtid;
+
+public class VitessPosition extends Position {
+ private final Vgtid vgtid;
+
+ public VitessPosition(Vgtid vgtid) {
+ super(null, 0L);
+ this.vgtid = vgtid;
+ }
+
+ public static VitessPosition valueOf(Vgtid vgtid) {
+ return new VitessPosition(vgtid);
+ }
+
+ public Vgtid getVgtid() {
+ return vgtid;
+ }
+
+ @Override
+ public String toString() {
+ return "Position[" + vgtid + "]";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof VitessPosition) {
+ VitessPosition other = (VitessPosition) o;
+ return vgtid.equals(other.vgtid);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return vgtid.hashCode();
+ }
+
+ public boolean newerThan(Position other) {
+ if (other instanceof VitessPosition) {
+ // FIXME: Implement actual newerThan comparison for Vgtid values
+ // For now just check if it is different to avoid persisting the same position
+ // multiple times
+ VitessPosition vOther = (VitessPosition) other;
+ return !vgtid.equals(vOther.vgtid);
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/vitess/ColumnMetaData.java b/src/main/java/com/zendesk/maxwell/replication/vitess/ColumnMetaData.java
new file mode 100644
index 000000000..2ed678771
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/vitess/ColumnMetaData.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.zendesk.maxwell.replication.vitess;
+
+/**
+ * It maps the VStream FIELD to a relational column. A list of ColumnMetaData
+ * can be used to create
+ * a {@link VitessTable}.
+ */
+public class ColumnMetaData {
+ private final String columnName;
+ private final VitessType vitessType;
+ private final boolean optional;
+ private final KeyMetaData keyMetaData;
+
+ public ColumnMetaData(String columnName, VitessType vitessType, boolean optional, KeyMetaData keyMetaData) {
+ this.columnName = columnName.intern();
+ this.vitessType = vitessType;
+ this.keyMetaData = keyMetaData;
+ this.optional = optional;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public VitessType getVitessType() {
+ return vitessType;
+ }
+
+ public boolean isOptional() {
+ return optional;
+ }
+
+ public KeyMetaData getKeyMetaData() {
+ return keyMetaData;
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/vitess/KeyMetaData.java b/src/main/java/com/zendesk/maxwell/replication/vitess/KeyMetaData.java
new file mode 100644
index 000000000..8c5d4606c
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/vitess/KeyMetaData.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.zendesk.maxwell.replication.vitess;
+
+/**
+ * If a column is part of the primary key (including multi-column primary key),
+ * or a column is a single-column unique key
+ */
+public enum KeyMetaData {
+ /**
+ * The column is part of the primary key (including multi-column primary key)
+ */
+ IS_KEY,
+ /**
+ * The column is single-column unique key
+ */
+ IS_UNIQUE_KEY,
+ /**
+ * The column is not part of any key, or the column is part of multi-column
+ * unique key
+ */
+ NONE
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/vitess/ReplicationMessageColumn.java b/src/main/java/com/zendesk/maxwell/replication/vitess/ReplicationMessageColumn.java
new file mode 100644
index 000000000..c24f09f68
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/vitess/ReplicationMessageColumn.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.zendesk.maxwell.replication.vitess;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Types;
+
+/** Logical representation of both column type and value. */
+public class ReplicationMessageColumn {
+ private final String columnName;
+ private final VitessType type;
+ private final byte[] rawValue;
+
+ public ReplicationMessageColumn(String columnName, VitessType type, byte[] rawValue) {
+ this.columnName = columnName.intern();
+ this.type = type;
+ this.rawValue = rawValue;
+ }
+
+ public String getName() {
+ return columnName;
+ }
+
+ public VitessType getType() {
+ return type;
+ }
+
+ public Object getValue() {
+ final VitessColumnValue value = new VitessColumnValue(rawValue);
+
+ if (value.isNull()) {
+ return null;
+ }
+
+ switch (type.getJdbcId()) {
+ case Types.SMALLINT:
+ return value.asShort();
+ case Types.INTEGER:
+ return value.asInteger();
+ case Types.BIGINT:
+ return value.asLong();
+ case Types.BLOB:
+ case Types.BINARY:
+ return value.asBytes();
+ case Types.VARCHAR:
+ return value.asString();
+ case Types.FLOAT:
+ return value.asFloat();
+ case Types.DOUBLE:
+ return value.asDouble();
+ default:
+ break;
+ }
+
+ return value.asDefault(type);
+ }
+
+ public byte[] getRawValue() {
+ return rawValue;
+ }
+
+ @Override
+ public String toString() {
+ return columnName + "=" + new String(rawValue, StandardCharsets.UTF_8);
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/vitess/Vgtid.java b/src/main/java/com/zendesk/maxwell/replication/vitess/Vgtid.java
new file mode 100644
index 000000000..1bd4c81d0
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/vitess/Vgtid.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.zendesk.maxwell.replication.vitess;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import binlogdata.Binlogdata;
+
+/** Vitess source position coordinates. */
+public class Vgtid {
+ public static final String CURRENT_GTID = "current";
+ public static final String KEYSPACE_KEY = "keyspace";
+ public static final String SHARD_KEY = "shard";
+ public static final String GTID_KEY = "gtid";
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private final Binlogdata.VGtid rawVgtid;
+ private final List shardGtids = new ArrayList<>();
+
+ private Vgtid(Binlogdata.VGtid rawVgtid) {
+ this.rawVgtid = rawVgtid;
+ for (Binlogdata.ShardGtid shardGtid : rawVgtid.getShardGtidsList()) {
+ shardGtids.add(new ShardGtid(shardGtid.getKeyspace(), shardGtid.getShard(), shardGtid.getGtid()));
+ }
+ }
+
+ private Vgtid(List shardGtids) {
+ this.shardGtids.addAll(shardGtids);
+
+ Binlogdata.VGtid.Builder builder = Binlogdata.VGtid.newBuilder();
+ for (ShardGtid shardGtid : shardGtids) {
+ builder.addShardGtids(
+ Binlogdata.ShardGtid.newBuilder()
+ .setKeyspace(shardGtid.getKeyspace())
+ .setShard(shardGtid.getShard())
+ .setGtid(shardGtid.getGtid())
+ .build()
+ );
+ }
+ this.rawVgtid = builder.build();
+ }
+
+ public static Vgtid of(String shardGtidsInJson) {
+ try {
+ List shardGtids = MAPPER.readValue(shardGtidsInJson, new TypeReference>() { });
+ return of(shardGtids);
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public static Vgtid of(Binlogdata.VGtid rawVgtid) {
+ return new Vgtid(rawVgtid);
+ }
+
+ public static Vgtid of(List shardGtids) {
+ return new Vgtid(shardGtids);
+ }
+
+ public Binlogdata.VGtid getRawVgtid() {
+ return rawVgtid;
+ }
+
+ public List getShardGtids() {
+ return shardGtids;
+ }
+
+ public boolean isSingleShard() {
+ return rawVgtid.getShardGtidsCount() == 1;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return MAPPER.writeValueAsString(shardGtids);
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Vgtid vgtid = (Vgtid) o;
+ return Objects.equals(rawVgtid, vgtid.rawVgtid) &&
+ Objects.equals(shardGtids, vgtid.shardGtids);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rawVgtid, shardGtids);
+ }
+
+ @JsonPropertyOrder({ KEYSPACE_KEY, SHARD_KEY, GTID_KEY })
+ public static class ShardGtid {
+ private final String keyspace;
+ private final String shard;
+ private final String gtid;
+
+ @JsonCreator
+ public ShardGtid(
+ @JsonProperty(KEYSPACE_KEY) String keyspace,
+ @JsonProperty(SHARD_KEY) String shard,
+ @JsonProperty(GTID_KEY) String gtid
+ ) {
+ this.keyspace = keyspace.intern();
+ this.shard = shard;
+ this.gtid = gtid;
+ }
+
+ public String getKeyspace() {
+ return keyspace;
+ }
+
+ public String getShard() {
+ return shard;
+ }
+
+ public String getGtid() {
+ return gtid;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ShardGtid shardGtid = (ShardGtid) o;
+ return Objects.equals(keyspace, shardGtid.keyspace) &&
+ Objects.equals(shard, shardGtid.shard) &&
+ Objects.equals(gtid, shardGtid.gtid);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(keyspace, shard, gtid);
+ }
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/vitess/VitessColumn.java b/src/main/java/com/zendesk/maxwell/replication/vitess/VitessColumn.java
new file mode 100644
index 000000000..fbfdadcf9
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/vitess/VitessColumn.java
@@ -0,0 +1,23 @@
+package com.zendesk.maxwell.replication.vitess;
+
+public class VitessColumn {
+ private final String name;
+ private final VitessType type;
+
+ public VitessColumn(String name, VitessType type) {
+ this.name = name.intern();
+ this.type = type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public VitessType getType() {
+ return type;
+ }
+
+ public String toString() {
+ return "Column [name=" + name + ", type=" + type + "]";
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/vitess/VitessColumnValue.java b/src/main/java/com/zendesk/maxwell/replication/vitess/VitessColumnValue.java
new file mode 100644
index 000000000..83b18ba22
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/vitess/VitessColumnValue.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.zendesk.maxwell.replication.vitess;
+
+import java.nio.charset.StandardCharsets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A convenient wrapper that wraps the raw bytes value and converts it to Java
+ * value.
+ */
+public class VitessColumnValue {
+ private static final Logger LOGGER = LoggerFactory.getLogger(VitessColumnValue.class);
+
+ private final byte[] value;
+
+ public VitessColumnValue(byte[] value) {
+ this.value = value;
+ }
+
+ public byte[] getRawValue() {
+ return value;
+ }
+
+ public boolean isNull() {
+ return value == null;
+ }
+
+ public byte[] asBytes() {
+ return value;
+ }
+
+ /**
+ * Convert raw bytes value to string using UTF-8 encoding.
+ *
+ * This is *enforced* for VARCHAR and CHAR types, and is *required* for other
+ * non-bytes types (numeric,
+ * timestamp, etc.). For bytes (BLOB, BINARY, etc.) types, the asBytes() should
+ * be used.
+ *
+ * @return the UTF-8 string
+ */
+ public String asString() {
+ return new String(value, StandardCharsets.UTF_8);
+ }
+
+ public Integer asInteger() {
+ return Integer.valueOf(asString());
+ }
+
+ public Short asShort() {
+ return Short.valueOf(asString());
+ }
+
+ public Long asLong() {
+ return Long.valueOf(asString());
+ }
+
+ public Float asFloat() {
+ return Float.valueOf(asString());
+ }
+
+ public Double asDouble() {
+ return Double.valueOf(asString());
+ }
+
+ public Object asDefault(VitessType vitessType) {
+ LOGGER.warn("processing unknown column type {} as string", vitessType);
+ return asString();
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/vitess/VitessSchema.java b/src/main/java/com/zendesk/maxwell/replication/vitess/VitessSchema.java
new file mode 100644
index 000000000..a09cabebc
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/vitess/VitessSchema.java
@@ -0,0 +1,125 @@
+package com.zendesk.maxwell.replication.vitess;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import binlogdata.Binlogdata.FieldEvent;
+import io.vitess.proto.Query.Field;
+
+// An in-memory representation of a Vitess schema.
+public class VitessSchema {
+ private static final Logger LOGGER = LoggerFactory.getLogger(VitessSchema.class);
+
+ // See all flags:
+ // https://dev.mysql.com/doc/dev/mysql-server/8.0.12/group__group__cs__column__definition__flags.html
+ private static final int NOT_NULL_FLAG = 1;
+ private static final int PRI_KEY_FLAG = 1 << 1;
+ private static final int UNIQUE_KEY_FLAG = 1 << 2;
+
+ private final Map tables = new HashMap<>();
+
+ public void addTable(VitessTable table) {
+ String tableName = table.getQualifiedTableName().intern();
+ if (tables.containsKey(tableName)) {
+ LOGGER.info("Schema change detected for: {}", table);
+ } else {
+ LOGGER.info("Table schema received for: {}", table);
+ }
+ tables.put(tableName, table);
+ }
+
+ public VitessTable getTable(String name) {
+ return tables.get(name);
+ }
+
+ public void processFieldEvent(FieldEvent event) {
+ if (event == null) {
+ throw new RuntimeException(String.format("fieldEvent is expected from {}", event));
+ }
+
+ String qualifiedTableName = event.getTableName();
+ String[] schemaTableTuple = qualifiedTableName.split("\\.");
+ if (schemaTableTuple.length != 2) {
+ throw new RuntimeException(
+ String.format(
+ "Handling FIELD VEvent. schemaTableTuple should have schema name and table name but has size {}. {} is skipped",
+ schemaTableTuple.length,
+ event));
+ }
+
+ LOGGER.debug("Handling FIELD VEvent: {}", event);
+ String schemaName = schemaTableTuple[0];
+ String tableName = schemaTableTuple[1];
+ int columnCount = event.getFieldsCount();
+
+ List columns = new ArrayList<>(columnCount);
+ for (short i = 0; i < columnCount; ++i) {
+ Field field = event.getFields(i);
+ String columnName = validateColumnName(field.getName(), schemaName, tableName);
+
+ VitessType vitessType = VitessType.resolve(field);
+ if (vitessType.getJdbcId() == Types.OTHER) {
+ LOGGER.error("Cannot resolve JDBC type from VStream field {}", field);
+ }
+
+ KeyMetaData keyMetaData = KeyMetaData.NONE;
+ if ((field.getFlags() & PRI_KEY_FLAG) != 0) {
+ keyMetaData = KeyMetaData.IS_KEY;
+ } else if ((field.getFlags() & UNIQUE_KEY_FLAG) != 0) {
+ keyMetaData = KeyMetaData.IS_UNIQUE_KEY;
+ }
+ boolean optional = (field.getFlags() & NOT_NULL_FLAG) == 0;
+
+ columns.add(new ColumnMetaData(columnName, vitessType, optional, keyMetaData));
+ }
+
+ VitessTable table = createTable(schemaName, tableName, columns);
+ addTable(table);
+ }
+
+ private static String validateColumnName(String columnName, String schemaName, String tableName) {
+ int length = columnName.length();
+ if (length == 0) {
+ throw new IllegalArgumentException(
+ String.format("Empty column name from schema: %s, table: %s", schemaName, tableName));
+ }
+
+ // Vitess VStreamer schema reloading transient bug could cause column names to
+ // be anonymized to @1, @2, etc
+ // We want to fail in this case instead of sending the corrupted row events with
+ // @1, @2 as column names.
+ char first = columnName.charAt(0);
+ if (first == '@') {
+ throw new IllegalArgumentException(
+ String.format(
+ "Illegal prefix '@' for column: %s, from schema: %s, table: %s",
+ columnName,
+ schemaName,
+ tableName));
+ }
+
+ return columnName;
+ }
+
+ private VitessTable createTable(String schemaName, String tableName, List columnsMetaData) {
+ List columns = new ArrayList<>(columnsMetaData.size());
+ List pkColumns = new ArrayList<>();
+
+ for (ColumnMetaData columnMetaData : columnsMetaData) {
+ VitessColumn column = new VitessColumn(columnMetaData.getColumnName(), columnMetaData.getVitessType());
+ columns.add(column);
+
+ if (columnMetaData.getKeyMetaData() == KeyMetaData.IS_KEY) {
+ pkColumns.add(columnMetaData.getColumnName());
+ }
+ }
+
+ return new VitessTable(schemaName, tableName, columns, pkColumns);
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/vitess/VitessTable.java b/src/main/java/com/zendesk/maxwell/replication/vitess/VitessTable.java
new file mode 100644
index 000000000..861fbf3aa
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/vitess/VitessTable.java
@@ -0,0 +1,90 @@
+package com.zendesk.maxwell.replication.vitess;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+
+import binlogdata.Binlogdata.RowChange;
+import io.vitess.proto.Query.Row;
+
+public class VitessTable {
+ private final String schemaName;
+ private final String tableName;
+ private final List columns;
+ private final List pkColumns;
+
+ public VitessTable(String schemaName, String tableName, List columns, List pkColumns) {
+ this.schemaName = schemaName.intern();
+ this.tableName = tableName.intern();
+ this.columns = columns;
+ this.pkColumns = pkColumns;
+ }
+
+ public String getSchemaName() {
+ return schemaName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getQualifiedTableName() {
+ return schemaName + "." + tableName;
+ }
+
+ public List getColumns() {
+ return columns;
+ }
+
+ public List getPkColumns() {
+ return pkColumns;
+ }
+
+ public String toString() {
+ return "Table [schemaName=" + schemaName
+ + ", tableName=" + tableName
+ + ", columns=" + columns
+ + ", pkColumns=" + pkColumns
+ + "]";
+ }
+
+ /**
+ * Resolve a specific row from vEvent data to a list of replication message
+ * columns (with values).
+ */
+ public List resolveColumnsFromRow(Row row) {
+ int changedColumnsCnt = row.getLengthsCount();
+ if (columns.size() != changedColumnsCnt) {
+ throw new IllegalStateException(
+ String.format(
+ "The number of columns in the ROW event {} is different from the in-memory table schema {}.",
+ row,
+ this));
+ }
+
+ ByteString rawValues = row.getValues();
+ int rawValueIndex = 0;
+ List eventColumns = new ArrayList<>(changedColumnsCnt);
+ for (short i = 0; i < changedColumnsCnt; i++) {
+ final VitessColumn columnDefinition = columns.get(i);
+ final String columnName = columnDefinition.getName();
+ final VitessType vitessType = columnDefinition.getType();
+
+ final int rawValueLength = (int) row.getLengths(i);
+ final byte[] rawValue = rawValueLength == -1
+ ? null
+ : rawValues.substring(rawValueIndex, rawValueIndex + rawValueLength).toByteArray();
+ if (rawValueLength != -1) {
+ // no update to rawValueIndex when no value in the rawValue
+ rawValueIndex += rawValueLength;
+ }
+
+ final ReplicationMessageColumn column = new ReplicationMessageColumn(columnName, vitessType, rawValue);
+ eventColumns.add(column);
+ }
+ return eventColumns;
+ }
+
+
+}
diff --git a/src/main/java/com/zendesk/maxwell/replication/vitess/VitessType.java b/src/main/java/com/zendesk/maxwell/replication/vitess/VitessType.java
new file mode 100644
index 000000000..cbc30d53d
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/replication/vitess/VitessType.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.zendesk.maxwell.replication.vitess;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import com.google.common.collect.ImmutableList;
+
+import io.vitess.proto.Query.Field;
+
+/** The Vitess table column type */
+public class VitessType {
+ // name of the column type
+ private final String name;
+ // enum of column jdbc type
+ private final int jdbcId;
+ // permitted enum values
+ private final List enumValues;
+
+ public VitessType(String name, int jdbcId) {
+ this(name, jdbcId, Collections.emptyList());
+ }
+
+ public VitessType(String name, int jdbcId, List enumValues) {
+ this.name = name.intern();
+ this.jdbcId = jdbcId;
+
+ ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(enumValues.size());
+ for (String enumValue : enumValues) {
+ builder.add(enumValue.intern());
+ }
+ this.enumValues = builder.build();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getJdbcId() {
+ return jdbcId;
+ }
+
+ public List getEnumValues() {
+ return enumValues;
+ }
+
+ public boolean isEnum() {
+ return !enumValues.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return "VitessType{" +
+ "name='" + name + '\'' +
+ ", jdbcId=" + jdbcId +
+ ", enumValues=" + enumValues +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ VitessType that = (VitessType) o;
+ return jdbcId == that.jdbcId && name.equals(that.name) && Objects.equals(enumValues, that.enumValues);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, jdbcId, enumValues);
+ }
+
+ // Resolve JDBC type from vstream FIELD event
+ public static VitessType resolve(Field field) {
+ final String type = field.getType().name();
+ switch (type) {
+ case "INT8":
+ case "UINT8":
+ case "INT16":
+ return new VitessType(type, Types.SMALLINT);
+ case "UINT16":
+ case "INT24":
+ case "UINT24":
+ case "INT32":
+ return new VitessType(type, Types.INTEGER);
+ case "ENUM":
+ return new VitessType(type, Types.INTEGER, resolveEnumAndSetValues(field.getColumnType()));
+ case "SET":
+ return new VitessType(type, Types.BIGINT, resolveEnumAndSetValues(field.getColumnType()));
+ case "UINT32":
+ case "INT64":
+ return new VitessType(type, Types.BIGINT);
+ case "BLOB":
+ return new VitessType(type, Types.BLOB);
+ case "VARBINARY":
+ case "BINARY":
+ return new VitessType(type, Types.BINARY);
+ case "UINT64":
+ case "VARCHAR":
+ case "CHAR":
+ case "TEXT":
+ case "JSON":
+ case "DECIMAL":
+ case "TIME":
+ case "DATE":
+ case "DATETIME":
+ case "TIMESTAMP":
+ case "YEAR":
+ return new VitessType(type, Types.VARCHAR);
+ case "FLOAT32":
+ return new VitessType(type, Types.FLOAT);
+ case "FLOAT64":
+ return new VitessType(type, Types.DOUBLE);
+ default:
+ return new VitessType(type, Types.OTHER);
+ }
+ }
+
+ /**
+ * Resolve the list of permitted Enum or Set values from the Enum or Set
+ * Definition
+ *
+ * @param definition
+ * the Enum or Set column definition from the MySQL table.
+ * E.g. "enum('m','l','xl')" or "set('a','b','c')"
+ * @return The list of permitted Enum values or Set values
+ */
+ private static List resolveEnumAndSetValues(String definition) {
+ List values = new ArrayList<>();
+ if (definition == null || definition.length() == 0) {
+ return values;
+ }
+
+ StringBuilder sb = new StringBuilder();
+ boolean startCollecting = false;
+ char[] chars = definition.toCharArray();
+ for (int i = 0; i < chars.length; i++) {
+ if (chars[i] == '\'') {
+ if (chars[i + 1] != '\'') {
+ if (startCollecting) {
+ // end of the Enum/Set value, add the Enum/Set value to the result list
+ values.add(sb.toString());
+ sb.setLength(0);
+ }
+ startCollecting = !startCollecting;
+ } else {
+ sb.append("'");
+ // In MySQL, the single quote in the Enum/Set definition "a'b" is escaped and
+ // becomes "a''b".
+ // Skip the second single-quote
+ i++;
+ }
+ } else if (startCollecting) {
+ sb.append(chars[i]);
+ }
+ }
+ return values;
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/row/RowMap.java b/src/main/java/com/zendesk/maxwell/row/RowMap.java
index 969879491..c75eca41f 100644
--- a/src/main/java/com/zendesk/maxwell/row/RowMap.java
+++ b/src/main/java/com/zendesk/maxwell/row/RowMap.java
@@ -318,6 +318,11 @@ public void putOldData(String key, Object value) {
}
public Position getNextPosition() { return nextPosition; }
+
+ public void setNextPosition(Position p) {
+ this.nextPosition = p;
+ }
+
public Position getPosition() { return position; }
public Long getXid() {
diff --git a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java
index b61aa271d..ae94680e9 100644
--- a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java
+++ b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java
@@ -20,10 +20,10 @@
public class MysqlPositionStore {
static final Logger LOGGER = LoggerFactory.getLogger(MysqlPositionStore.class);
private static final Long DEFAULT_GTID_SERVER_ID = new Long(0);
- private final Long serverID;
- private String clientID;
+ protected final Long serverID;
+ protected String clientID;
private final boolean gtidMode;
- private final ConnectionPool connectionPool;
+ protected final ConnectionPool connectionPool;
public MysqlPositionStore(ConnectionPool pool, Long serverID, String clientID, boolean gtidMode) {
this.connectionPool = pool;
@@ -157,7 +157,7 @@ public Long getLastHeartbeatSent() {
return lastHeartbeat;
}
- private Position positionFromResultSet(ResultSet rs) throws SQLException {
+ protected Position positionFromResultSet(ResultSet rs) throws SQLException {
if ( !rs.next() )
return null;
diff --git a/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java b/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java
index ea602adc3..baed9fd41 100644
--- a/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java
+++ b/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java
@@ -109,6 +109,10 @@ private static void performAlter(Connection c, String sql) throws SQLException {
public static void upgradeSchemaStoreSchema(Connection c) throws SQLException, IOException {
ArrayList maxwellTables = getMaxwellTables(c);
+ if ( !getTableColumns("positions", c).containsKey("vitess_gtid") ) {
+ performAlter(c, "alter table `positions` add column vitess_gtid text charset latin1");
+ }
+
if ( !getTableColumns("schemas", c).containsKey("deleted") ) {
performAlter(c, "alter table `schemas` add column deleted tinyint(1) not null default 0");
}
diff --git a/src/main/java/com/zendesk/maxwell/schema/VitessPositionStore.java b/src/main/java/com/zendesk/maxwell/schema/VitessPositionStore.java
new file mode 100644
index 000000000..9e558a8ef
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/schema/VitessPositionStore.java
@@ -0,0 +1,85 @@
+package com.zendesk.maxwell.schema;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.zendesk.maxwell.errors.DuplicateProcessException;
+import com.zendesk.maxwell.replication.Position;
+import com.zendesk.maxwell.replication.VitessPosition;
+import com.zendesk.maxwell.replication.vitess.Vgtid;
+import com.zendesk.maxwell.util.ConnectionPool;
+
+public class VitessPositionStore extends MysqlPositionStore {
+ static final Logger LOGGER = LoggerFactory.getLogger(VitessPositionStore.class);
+
+ public VitessPositionStore(ConnectionPool pool, String clientID) {
+ super(pool, 0L, clientID, true);
+ }
+
+ @Override
+ public void set(Position p) throws SQLException, DuplicateProcessException {
+ if (p == null) {
+ LOGGER.debug("Position is null, not persisting it");
+ return;
+ }
+
+ VitessPosition vp = (VitessPosition) p;
+ Vgtid vgtid = vp.getVgtid();
+ if (vgtid == null) {
+ throw new RuntimeException("Vitess position store called with a mysql position");
+ }
+
+ String sql = "INSERT INTO `positions` "
+ + "SET server_id = ?, client_id = ?, vitess_gtid = ? "
+ + "ON DUPLICATE KEY UPDATE client_id = ?, vitess_gtid = ?";
+
+ connectionPool.withSQLRetry(1, (c) -> {
+ try (PreparedStatement s = c.prepareStatement(sql)) {
+ final String vgtidStr = vgtid.toString();
+ LOGGER.debug("Writing VGTID to {}.positions: {}", c.getCatalog(), vgtidStr);
+
+ s.setLong(1, serverID);
+ s.setString(2, clientID);
+ s.setString(3, vgtidStr);
+ s.setString(4, clientID);
+ s.setString(5, vgtidStr);
+
+ s.execute();
+ }
+ });
+ }
+
+ @Override
+ public Position getLatestFromAnyClient() throws SQLException {
+ return null; // Never recover from a different client on Vitess
+ }
+
+
+ @Override
+ public long heartbeat() throws Exception {
+ // Heartbeats are not supported in Vitess.
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ protected Position positionFromResultSet(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return null;
+ }
+
+ String vgtidString = rs.getString("vitess_gtid");
+ if (vgtidString == null) {
+ LOGGER.warn("Read a null VTGID value from the positions table, assuming we need to start from the current position");
+ return null;
+ }
+
+ LOGGER.debug("Read VGTID from positions: {}", vgtidString);
+ Vgtid vgtid = Vgtid.of(vgtidString);
+
+ return new VitessPosition(vgtid);
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/util/AbstractConfig.java b/src/main/java/com/zendesk/maxwell/util/AbstractConfig.java
index 028a4b934..8d6dcbb0b 100644
--- a/src/main/java/com/zendesk/maxwell/util/AbstractConfig.java
+++ b/src/main/java/com/zendesk/maxwell/util/AbstractConfig.java
@@ -12,6 +12,8 @@
import joptsimple.*;
import com.zendesk.maxwell.MaxwellMysqlConfig;
+import com.zendesk.maxwell.MaxwellVitessConfig;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -224,6 +226,25 @@ protected MaxwellMysqlConfig parseMysqlConfig(String prefix, OptionSet options,
return config;
}
+ protected MaxwellVitessConfig parseVitessConfig(OptionSet options, Properties properties) {
+ MaxwellVitessConfig config = new MaxwellVitessConfig();
+ config.vtgateHost = fetchStringOption("vitess_host", options, properties, "localhost");
+ config.vtgatePort = fetchIntegerOption("vitess_port", options, properties, 15991);
+
+ config.usePlaintext = fetchBooleanOption("vitess_plaintext", options, properties, true);
+ config.tlsCA = fetchStringOption("vitess_tls_ca", options, properties, null);
+ config.tlsCert = fetchStringOption("vitess_tls_cert", options, properties, null);
+ config.tlsKey = fetchStringOption("vitess_tls_key", options, properties, null);
+ config.tlsServerName = fetchStringOption("vitess_tls_server_name", options, properties, null);
+
+ config.user = fetchStringOption("vitess_user", options, properties, null);
+ config.password = fetchStringOption("vitess_password", options, properties, null);
+
+ config.keyspace = fetchStringOption("vitess_keyspace", options, properties, null);
+ config.shard = fetchStringOption("vitess_shard", options, properties, "");
+ return config;
+ }
+
private SSLMode getSslModeFromString(String sslMode) {
if (sslMode != null) {
for (SSLMode mode : SSLMode.values()) {
diff --git a/src/main/resources/sql/maxwell_schema.sql b/src/main/resources/sql/maxwell_schema.sql
index cb89699dd..3c635bfc3 100644
--- a/src/main/resources/sql/maxwell_schema.sql
+++ b/src/main/resources/sql/maxwell_schema.sql
@@ -52,6 +52,7 @@ CREATE TABLE IF NOT EXISTS `positions` (
binlog_file varchar(255),
binlog_position int unsigned,
gtid_set varchar(4096),
+ vgtid text charset latin1,
client_id varchar(255) charset latin1 not null default 'maxwell',
heartbeat_at bigint null default null,
last_heartbeat_read bigint null default null,
diff --git a/vitess/01-start-vitess.sh b/vitess/01-start-vitess.sh
new file mode 100755
index 000000000..2e156ee26
--- /dev/null
+++ b/vitess/01-start-vitess.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+
+set -x
+
+DOCKER_BIN=$(which docker)
+if [[ "$DOCKER_BIN" != "" ]]; then
+ echo "Docker is installed"
+ COMPOSE_COMMAND="docker compose"
+else
+ echo "Docker is not installed, trying podman..."
+ PODMAN_BIN=$(which podman)
+ if [ -z "$PODMAN_BIN" ]; then
+ echo "Podman is not installed either, exiting..."
+ exit 1
+ fi
+ COMPOSE_COMMAND="podman-compose"
+fi
+
+# Stop and remove any existing containers
+$COMPOSE_COMMAND -f vitess/docker-compose.yml down
+
+# Start the Vitess cluster
+$COMPOSE_COMMAND -f vitess/docker-compose.yml up
diff --git a/vitess/02-start-maxwell.sh b/vitess/02-start-maxwell.sh
new file mode 100755
index 000000000..51f0f9383
--- /dev/null
+++ b/vitess/02-start-maxwell.sh
@@ -0,0 +1,6 @@
+#!/usr/bin/env bash
+
+set -ex
+
+mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true
+bin/maxwell --kafka_version 2.7.0 --config vitess/config.properties
diff --git a/vitess/03-cleanup-vitess.sh b/vitess/03-cleanup-vitess.sh
new file mode 100755
index 000000000..98ee5c7cc
--- /dev/null
+++ b/vitess/03-cleanup-vitess.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+
+set -x
+
+DOCKER_BIN=$(which docker)
+if [[ "$DOCKER_BIN" != "" ]]; then
+ echo "Docker is installed"
+ COMPOSE_COMMAND="docker compose"
+else
+ echo "Docker is not installed, trying podman..."
+ PODMAN_BIN=$(which podman)
+ if [ -z "$PODMAN_BIN" ]; then
+ echo "Podman is not installed either, exiting..."
+ exit 1
+ fi
+ COMPOSE_COMMAND="podman-compose"
+fi
+
+# Stop and remove any existing containers
+$COMPOSE_COMMAND -f vitess/docker-compose.yml down
diff --git a/vitess/README.md b/vitess/README.md
new file mode 100644
index 000000000..96f429c85
--- /dev/null
+++ b/vitess/README.md
@@ -0,0 +1,72 @@
+# Vitess Support for Maxwell
+
+This directory contains a set of scripts and configuration files to help develop
+and test Vitess support in Maxwell.
+
+## Starting Vitess Test Server
+
+The easiest way to develop and test Vitess clients is by running the `vttestserver`
+Docker image. It contains all of the components of Vitess + a MySQL instance configured
+in a way, that mimics a real Vitess cluster close enough for most development purposes.
+
+See https://vitess.io/docs/13.0/get-started/vttestserver-docker-image/ for more details.
+
+For developing Maxwell support for Vitess, you can run run vttestserver using the
+provided script:
+
+```
+$ vitess/01-start-vitess.sh
+```
+
+The script will use Docker or Podman to start vttestserver and expose its gRPC and MySQL ports
+so that Maxwell can connect to those.
+
+## Running Maxwell against Vitess
+
+To start Maxwell against the provided vttestserver cluster, you can use the provided script:
+
+```
+$ vitess/02-start-maxwell.sh
+```
+The script will build Maxwell and then start it with the provided properties file
+(`vitess/config.properties`). The properties file configures the following:
+
+1. Maxwell's schema is stored in a dedicated Vitess keyspace called `maxwell`.
+2. Maxwell follows all changes in a Vitess keyspace called `app_shard`.
+
+## Connecting to Vitess with a MySQL client
+
+To play with the provided vttestserver cluster, you can connect to it using any MySQL client.
+The test server does not require any credentials.
+
+Here is an example command:
+
+```
+mysql -h 127.0.0.1 -P 33577
+```
+
+The MySQL interface in vtgate will expose multiple keyspaces, which you can read and write
+into as usual.
+
+## Connecting to VStream API using a gRPC client
+
+If you would like to experiment with the gRPC APIs exposed by Vitess, you can use any gRPC
+client and connect to `localhost:33575`.
+
+Here is an example [gRPCurl](https://github.com/fullstorydev/grpcurl) command for following all
+changes made to the `app_shard` keyspace:
+
+```
+grpcurl -plaintext -d '{"vgtid":{"shard_gtids":[{"keyspace":"app_shard", "gtid":"current"}]}}' localhost:33575 vtgateservice.Vitess.VStream
+```
+
+This will run the VStream API and send all events to console as a JSON stream.
+
+## Cleaning up after testing
+
+When you're done working with the vttestserver cluster, you may want to run the script provided to
+clean up any remaining Docker containers:
+
+```
+$ vitess/03-cleanup-vitess.sh
+```
diff --git a/vitess/config.properties b/vitess/config.properties
new file mode 100644
index 000000000..cc941d739
--- /dev/null
+++ b/vitess/config.properties
@@ -0,0 +1,42 @@
+#
+# This is a sample configuration file for running Maxwell against a Vitess cluster
+#
+
+# Produce debug-level logging
+log_level=debug
+
+# Send all events to console
+producer=stdout
+
+# Enable vitess support
+vitess=true
+vitess_keyspace=app_shard
+vitess_shard=
+
+# VTgate gRPC address
+vitess_host=127.0.0.1
+vitess_port=43575
+
+# Credentials to use for Vitess gRPC calls
+# vitess_user=banana
+# vitess_password=ban@na
+
+# Use TLS to connect to vtgate
+# vitess_plaintext=false
+
+# Override the TLS server name used for server certificate verification (default: vitess_host)
+# vitess_tls_server_name=vitess.test
+
+# Use a custom TLS CA certificate to verify the server certificate
+# vitess_tls_ca=/Users/kovyrin/src/github.com/Shopify/vttestserver/tls/vitess.ca.pem
+
+# Use a client TLS certificate to authenticate to vtgate
+# vitess_tls_cert=/Users/kovyrin/src/github.com/Shopify/vttestserver/tls/client.pem
+# vitess_tls_key=/Users/kovyrin/src/github.com/Shopify/vttestserver/tls/client.key
+
+# MySQL login info for storing maxwell metadata (a separate vitess keyspace, no sharding)
+host=127.0.0.1
+port=43577
+user=root
+password=
+schema_database=maxwell
diff --git a/vitess/docker-compose.yml b/vitess/docker-compose.yml
new file mode 100644
index 000000000..07ff6e5a1
--- /dev/null
+++ b/vitess/docker-compose.yml
@@ -0,0 +1,19 @@
+version: "3.9"
+services:
+ vttestserver:
+ image: vitess/vttestserver:mysql57
+ ports:
+ - "43575:33575" # gRPC (VTGate, etc)
+ - "43577:33577" # MySQL
+ environment:
+ - PORT=33574
+ - PLANNER_VERSION=gen4fallback
+ - KEYSPACES=maxwell,app_master,app_shard,sharded_tests
+ - NUM_SHARDS=1,1,1,2
+ - MYSQL_MAX_CONNECTIONS=70000
+ - MYSQL_BIND_HOST=0.0.0.0
+ healthcheck:
+ test: ["CMD", "mysqladmin", "ping", "-h127.0.0.1", "-P43577"]
+ interval: 5s
+ timeout: 2s
+ retries: 5