Skip to content

Commit

Permalink
refactor(schema): add database filtering to JSON detection
Browse files Browse the repository at this point in the history
Updated `detectMariaDBJSON` to accept `dbName` as a parameter,
enabling filtering by `CONSTRAINT_SCHEMA` for more precise JSON
constraint detection within specified databases.
  • Loading branch information
CoreyWinkelmannPP committed Nov 13, 2024
1 parent d498902 commit 004f828
Showing 1 changed file with 45 additions and 44 deletions.
89 changes: 45 additions & 44 deletions src/main/java/com/zendesk/maxwell/schema/SchemaCapturer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public class SchemaCapturer implements AutoCloseable {
static final Logger LOGGER = LoggerFactory.getLogger(SchemaCapturer.class);

public static final HashSet<String> IGNORED_DATABASES = new HashSet<>(
Arrays.asList(new String[]{"performance_schema", "information_schema"})
);
Arrays.asList(new String[] { "performance_schema", "information_schema" }));

private final Set<String> includeDatabases;
private final Set<String> includeTables;
Expand All @@ -36,12 +35,12 @@ public class SchemaCapturer implements AutoCloseable {
private final PreparedStatement columnPreparedStatement;
private final PreparedStatement pkPreparedStatement;


public SchemaCapturer(Connection c, CaseSensitivity sensitivity) throws SQLException {
this(c, sensitivity, Collections.emptySet(), Collections.emptySet());
}

SchemaCapturer(Connection c, CaseSensitivity sensitivity, Set<String> includeDatabases, Set<String> includeTables) throws SQLException {
SchemaCapturer(Connection c, CaseSensitivity sensitivity, Set<String> includeDatabases, Set<String> includeTables)
throws SQLException {
this.includeDatabases = includeDatabases;
this.includeTables = includeTables;
this.connection = c;
Expand Down Expand Up @@ -99,12 +98,10 @@ public Schema capture() throws SQLException {
LOGGER.debug("Capturing schemas...");
ArrayList<Database> databases = new ArrayList<>();

String dbCaptureQuery =
"SELECT SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME FROM INFORMATION_SCHEMA.SCHEMATA";
String dbCaptureQuery = "SELECT SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME FROM INFORMATION_SCHEMA.SCHEMATA";

if ( includeDatabases.size() > 0 ) {
dbCaptureQuery +=
" WHERE SCHEMA_NAME IN " + Sql.inListSQL(includeDatabases.size());
if (includeDatabases.size() > 0) {
dbCaptureQuery += " WHERE SCHEMA_NAME IN " + Sql.inListSQL(includeDatabases.size());
}
dbCaptureQuery += " ORDER BY SCHEMA_NAME";

Expand Down Expand Up @@ -135,13 +132,14 @@ public Schema capture() throws SQLException {
}
LOGGER.debug("{} database schemas captured!", size);


Schema s = new Schema(databases, captureDefaultCharset(), this.sensitivity);
try {
if ( isMariaDB() && mariaSupportsJSON()) {
detectMariaDBJSON(s);
if (isMariaDB() && mariaSupportsJSON()) {
for (String dbName : s.getDatabaseNames()) {
detectMariaDBJSON(s, dbName);
}
}
} catch ( InvalidSchemaError e ) {
} catch (InvalidSchemaError e) {
e.printStackTrace();
}
return s;
Expand All @@ -150,13 +148,12 @@ public Schema capture() throws SQLException {
private String captureDefaultCharset() throws SQLException {
LOGGER.debug("Capturing Default Charset");
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("select @@character_set_server")) {
ResultSet rs = stmt.executeQuery("select @@character_set_server")) {
rs.next();
return rs.getString("@@character_set_server");
}
}


private void captureDatabase(Database db) throws SQLException {
tablePreparedStatement.setString(1, db.getName());
Sql.prepareInList(tablePreparedStatement, 2, includeTables);
Expand All @@ -173,9 +170,8 @@ private void captureDatabase(Database db) throws SQLException {
captureTables(db, tables);
}


private boolean isMySQLAtLeast56() throws SQLException {
if ( isMariaDB() )
if (isMariaDB())
return true;

DatabaseMetaData meta = connection.getMetaData();
Expand All @@ -190,13 +186,14 @@ private boolean isMariaDB() throws SQLException {
}

static final String MARIA_VERSION_REGEX = "[\\d\\.]+-(\\d+)\\.(\\d+)";

private boolean mariaSupportsJSON() throws SQLException {
DatabaseMetaData meta = connection.getMetaData();
String versionString = meta.getDatabaseProductVersion();
Pattern pattern = Pattern.compile(MARIA_VERSION_REGEX);
Matcher m = pattern.matcher(versionString);

if ( m.find() ) {
if (m.find()) {
int major = Integer.parseInt(m.group(1));
int minor = Integer.parseInt(m.group(2));

Expand Down Expand Up @@ -267,7 +264,7 @@ private void captureTablesPK(Database db, HashMap<String, Table> tables) throws
String columnName = rs.getString("COLUMN_NAME");

ArrayList<String> pkList = tablePKMap.get(tableName);
if ( pkList != null )
if (pkList != null)
pkList.add(ordinalPosition - 1, columnName);
}
}
Expand All @@ -294,7 +291,7 @@ static String[] extractEnumValues(String expandedType) {
Matcher enumMatcher = pattern.matcher(enumValues);

List<String> result = new ArrayList<>();
while(enumMatcher.find()) {
while (enumMatcher.find()) {
String value = enumMatcher.group(0);
if (value.startsWith("'"))
value = value.substring(1);
Expand All @@ -309,44 +306,48 @@ static String[] extractEnumValues(String expandedType) {
@Override
public void close() throws SQLException {
try (PreparedStatement p1 = tablePreparedStatement;
PreparedStatement p2 = columnPreparedStatement;
PreparedStatement p3 = pkPreparedStatement) {
PreparedStatement p2 = columnPreparedStatement;
PreparedStatement p3 = pkPreparedStatement) {
// auto-close shared prepared statements
}
}

private void detectMariaDBJSON(Schema schema) throws SQLException, InvalidSchemaError {
private void detectMariaDBJSON(Schema schema, String dbName) throws SQLException, InvalidSchemaError {
String checkConstraintSQL = "SELECT CONSTRAINT_SCHEMA, TABLE_NAME, CONSTRAINT_NAME, CHECK_CLAUSE " +
"from INFORMATION_SCHEMA.CHECK_CONSTRAINTS " +
"where CHECK_CLAUSE LIKE 'json_valid(%)'";
"from INFORMATION_SCHEMA.CHECK_CONSTRAINTS " +
"where CONSTRAINT_SCHEMA = ? AND CHECK_CLAUSE LIKE 'json_valid(%)'";

String regex = "json_valid\\(`(.*)`\\)";
Pattern pattern = Pattern.compile(regex);

try (
PreparedStatement statement = connection.prepareStatement(checkConstraintSQL);
ResultSet rs = statement.executeQuery()
) {
while ( rs.next() ) {
String checkClause = rs.getString("CHECK_CLAUSE");
Matcher m = pattern.matcher(checkClause);
if ( m.find() ) {
String column = m.group(1);
Database d = schema.findDatabase(rs.getString("CONSTRAINT_SCHEMA"));
if ( d == null ) continue;
Table t = d.findTable(rs.getString("TABLE_NAME"));
if ( t == null ) continue;
short i = t.findColumnIndex(column);
if ( i < 0 ) continue;

ColumnDef cd = t.findColumn(i);
if ( cd instanceof StringColumnDef ) {
t.replaceColumn(i, JsonColumnDef.create(cd.getName(), "json", i));
PreparedStatement statement = connection.prepareStatement(checkConstraintSQL)) {
statement.setString(1, dbName);

try (ResultSet rs = statement.executeQuery()) {
while (rs.next()) {
String checkClause = rs.getString("CHECK_CLAUSE");
Matcher m = pattern.matcher(checkClause);
if (m.find()) {
String column = m.group(1);
Database d = schema.findDatabase(rs.getString("CONSTRAINT_SCHEMA"));
if (d == null)
continue;
Table t = d.findTable(rs.getString("TABLE_NAME"));
if (t == null)
continue;
short i = t.findColumnIndex(column);
if (i < 0)
continue;

ColumnDef cd = t.findColumn(i);
if (cd instanceof StringColumnDef) {
t.replaceColumn(i, JsonColumnDef.create(cd.getName(), "json", i));
}
}
}
}
}

}

}

0 comments on commit 004f828

Please sign in to comment.