Skip to content

Commit

Permalink
Merge pull request #5 from zendesk/sets_and_enums
Browse files Browse the repository at this point in the history
Sets and enums
  • Loading branch information
Ben Osheroff committed Mar 2, 2015
2 parents 8f347ab + 2976c3f commit 0318ba5
Show file tree
Hide file tree
Showing 17 changed files with 209 additions and 22 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>open-replicator</artifactId>
<version>1.1.2</version>
<version>1.1.3</version>
</dependency>
</dependencies>

Expand All @@ -87,7 +87,7 @@
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<version>4.3</version>
<version>4.5</version>
<executions>
<execution>
<id>antlr</id>
Expand Down
11 changes: 9 additions & 2 deletions src/main/antlr4/imports/column_definitions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ data_type:
generic_type
| signed_type
| string_type
// | enum_type
// | set_type
| enumerated_type
;

// from http://dev.mysql.com/doc/refman/5.1/en/create-table.html
Expand All @@ -42,6 +41,12 @@ string_type: // getting the encoding here
charset_def?
;

enumerated_type:
col_type=(ENUM | SET)
'(' enumerated_values ')'
charset_def?
;

column_options:
nullability
| default_value
Expand All @@ -52,6 +57,8 @@ column_options:
| STORAGE (DISK|MEMORY|DEFAULT)
;

enumerated_values: enum_value (',' enum_value)*;
enum_value: STRING_LITERAL;

charset_def: (character_set | collation)+;
character_set: ((CHARACTER SET) | CHARSET) charset_name;
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/zendesk/maxwell/BinlogPosition.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ public long getOffset() {
public String getFile() {
return file;
}

@Override
public String toString() {
return "BinlogPosition[" + file + ":" + offset + "]";
}
}
6 changes: 5 additions & 1 deletion src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zendesk.maxwell.schema.Schema;
import com.zendesk.maxwell.schema.SchemaCapturer;
import com.zendesk.maxwell.schema.SchemaStore;

public class Maxwell {
private Schema schema;
private MaxwellConfig config;
static final Logger LOGGER = LoggerFactory.getLogger(SchemaStore.class);

private void initFirstRun() throws SQLException, IOException {
Connection connection = this.config.getMasterConnection();
Expand All @@ -36,6 +39,7 @@ private void run(String[] args) throws Exception {

this.config = MaxwellConfig.fromPropfile(args[0]);
if ( this.config.getInitialPosition() != null ) {
LOGGER.info("Maxwell is booting, starting at " + this.config.getInitialPosition());
SchemaStore store = SchemaStore.restore(this.config.getMasterConnection(), this.config.getInitialPosition());
this.schema = store.getSchema();
} else {
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public static MaxwellConfig fromPropfile(String filename) throws IOException {
config.mysqlPort = Integer.valueOf(p.getProperty("port", "3306"));

config.currentPositionFile = p.getProperty("position_file", "maxwell.position");

System.out.println(config.mysqlPassword);
return config;
}
}
24 changes: 23 additions & 1 deletion src/main/java/com/zendesk/maxwell/schema/SchemaCapturer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -101,6 +104,7 @@ private void captureTable(Table t) throws SQLException {
ResultSet r = infoSchemaStmt.executeQuery();

while(r.next()) {
String[] enumValues = null;
String colName = r.getString("COLUMN_NAME");
String colType = r.getString("DATA_TYPE");
String colEnc = r.getString("CHARACTER_SET_NAME");
Expand All @@ -110,8 +114,26 @@ private void captureTable(Table t) throws SQLException {
if ( r.getString("COLUMN_KEY").equals("PRI") )
t.pkIndex = i;

t.getColumnList().add(ColumnDef.build(t.getName(), colName, colEnc, colType, colPos, colSigned));
if ( colType.equals("enum") || colType.equals("set")) {
String expandedType = r.getString("COLUMN_TYPE");

enumValues = extractEnumValues(expandedType);
}

t.getColumnList().add(ColumnDef.build(t.getName(), colName, colEnc, colType, colPos, colSigned, enumValues));
i++;
}
}

private static String[] extractEnumValues(String expandedType) {
String[] enumValues;
Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(expandedType);
matcher.matches(); // why do you tease me so.

enumValues = StringUtils.split(matcher.group(2), ",");
for(int j=0 ; j < enumValues.length; j++) {
enumValues[j] = enumValues[j].substring(1, enumValues[j].length() - 1);
}
return enumValues;
}
}
22 changes: 19 additions & 3 deletions src/main/java/com/zendesk/maxwell/schema/SchemaStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public SchemaStore(Connection connection) throws SQLException {
Statement.RETURN_GENERATED_KEYS);
this.columnInsert = connection
.prepareStatement(
"INSERT INTO `maxwell`.`columns` SET schema_id = ?, table_id = ?, name = ?, encoding=?, coltype=?, is_signed=?",
"INSERT INTO `maxwell`.`columns` SET schema_id = ?, table_id = ?, "
+ "name = ?, encoding=?, coltype=?, is_signed=?, enum_values=?",
Statement.RETURN_GENERATED_KEYS);
}

Expand Down Expand Up @@ -92,8 +93,15 @@ private void saveSchema() throws SQLException {
for (Table t : d.getTableList()) {
Integer tableId = executeInsert(tableInsert, schemaId, dbId, t.getName(), t.getEncoding());
for (ColumnDef c : t.getColumnList()) {
String [] enumValues = c.getEnumValues();
String enumValuesSQL = null;

if ( enumValues != null ) {
enumValuesSQL = StringUtils.join(enumValues, ",");
}

executeInsert(columnInsert, schemaId, tableId, c.getName(),
c.getEncoding(), c.getType(), c.getSigned() ? 1 : 0);
c.getEncoding(), c.getType(), c.getSigned() ? 1 : 0, enumValuesSQL);
}
}
}
Expand Down Expand Up @@ -156,6 +164,8 @@ private void restoreFrom(BinlogPosition targetPosition)
this.position = new BinlogPosition(schemaRS.getInt("binlog_position"),
schemaRS.getString("binlog_file"));

LOGGER.info("Restoring schema id " + schemaRS.getInt("id") + " (last modified at " + this.position + ")");

p = connection.prepareStatement("SELECT * from `maxwell`.`databases` where schema_id = ? ORDER by id");
p.setInt(1, schemaRS.getInt("id"));
ResultSet dbRS = p.executeQuery();
Expand Down Expand Up @@ -189,16 +199,22 @@ private void restoreTable(Database d, String name, int id, String encoding) thro

int i = 0;
while (cRS.next()) {
String[] enumValues = null;
if ( cRS.getString("enum_values") != null )
enumValues = StringUtils.split(cRS.getString("enum_values"), ",");

ColumnDef c = ColumnDef.build(t.getName(),
cRS.getString("name"), cRS.getString("encoding"),
cRS.getString("coltype"), i++,
cRS.getInt("is_signed") == 1);
cRS.getInt("is_signed") == 1,
enumValues);
t.getColumnList().add(c);
}
}

private ResultSet findSchema(BinlogPosition targetPosition)
throws SQLException {
LOGGER.debug("looking to restore schema at target position " + targetPosition);
PreparedStatement s = connection.prepareStatement(
"SELECT * from `maxwell`.`schemas` "
+ "WHERE (binlog_file < ?) OR (binlog_file = ? and binlog_position <= ?) "
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/zendesk/maxwell/schema/Table.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.zendesk.maxwell.schema;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang.StringUtils;

import com.zendesk.maxwell.schema.columndef.ColumnDef;
import com.zendesk.maxwell.schema.columndef.StringColumnDef;

Expand Down Expand Up @@ -92,6 +95,13 @@ private void diffColumnList(List<String> diffs, Table a, Table b, String nameA,
+ " vs "
+ other.getPos()
+ " in " + nameB);
} else if ( !Arrays.deepEquals(column.getEnumValues(), other.getEnumValues()) ) {
diffs.add(colName + "has an enum value mismatch, "
+ StringUtils.join(column.getEnumValues(), ",")
+ " vs "
+ StringUtils.join(other.getEnumValues(), ",")
+ " in " + nameB);

}
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDef.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.zendesk.maxwell.schema.columndef;

import org.apache.commons.lang.StringUtils;

public abstract class ColumnDef {
protected final String tableName;
protected final String name;
protected final String type;
protected String[] enumValues;
private int pos;
public boolean signed;
public String encoding;
Expand All @@ -24,10 +27,10 @@ public Object asJSON(Object value) {
}

public ColumnDef copy() {
return build(this.tableName, this.name, this.encoding, this.type, this.pos, this.signed);
return build(this.tableName, this.name, this.encoding, this.type, this.pos, this.signed, this.enumValues);
}

public static ColumnDef build(String tableName, String name, String encoding, String type, int pos, boolean signed) {
public static ColumnDef build(String tableName, String name, String encoding, String type, int pos, boolean signed, String enumValues[]) {
switch(type) {
case "tinyint":
case "smallint":
Expand Down Expand Up @@ -62,6 +65,10 @@ public static ColumnDef build(String tableName, String name, String encoding, St
return new YearColumnDef(tableName, name, type, pos);
case "time":
return new TimeColumnDef(tableName, name, type, pos);
case "enum":
return new EnumColumnDef(tableName, name, type, pos, enumValues);
case "set":
return new SetColumnDef(tableName, name, type, pos, enumValues);
default:
throw new IllegalArgumentException("unsupported column type " + type);
}
Expand Down Expand Up @@ -94,4 +101,9 @@ public String getEncoding() {
public boolean getSigned() {
return this.signed;
}

public String[] getEnumValues() {
return enumValues;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.zendesk.maxwell.schema.columndef;

import com.google.code.or.common.util.MySQLConstants;

public class EnumColumnDef extends ColumnDef {
public EnumColumnDef(String tableName, String name, String type, int pos, String[] enumValues) {
super(tableName, name, type, pos);
this.enumValues = enumValues;
}

@Override
public boolean matchesMysqlType(int type) {
return type == MySQLConstants.TYPE_ENUM;
}

@Override
public String toSQL(Object value) {
return "'" + asString(value) + "'";
}

@Override
public String asJSON(Object value) {
return asString(value);
}

private String asString(Object value) {
return enumValues[((Integer) value) - 1];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.zendesk.maxwell.schema.columndef;

import java.util.ArrayList;

import org.apache.commons.lang.StringUtils;

import com.google.code.or.common.util.MySQLConstants;

public class SetColumnDef extends ColumnDef {
public SetColumnDef(String tableName, String name, String type, int pos, String[] enumValues) {
super(tableName, name, type, pos);
this.enumValues = enumValues;
}

@Override
public boolean matchesMysqlType(int type) {
return type == MySQLConstants.TYPE_SET;
}

@Override
public String toSQL(Object value) {
return "'" + StringUtils.join(asList(value), "'") + "'";
}

@Override
public Object asJSON(Object value) {
return asList(value);
}

private ArrayList<String> asList(Object value) {
ArrayList<String> values = new ArrayList<>();
long v = (Long) value;
for(int i = 0; i < enumValues.length; i++) {
if ( ((v >> i) & 1) == 1 ) {
values.add(enumValues[i]);
}
}
return values;
}
}
Loading

0 comments on commit 0318ba5

Please sign in to comment.