Skip to content

Commit

Permalink
refactor(schema): add database filtering to JSON detection
Browse files Browse the repository at this point in the history
  • Loading branch information
CoreyWinkelmannPP committed Nov 14, 2024
1 parent d498902 commit 449e203
Showing 1 changed file with 26 additions and 22 deletions.
48 changes: 26 additions & 22 deletions src/main/java/com/zendesk/maxwell/schema/SchemaCapturer.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ public Schema capture() throws SQLException {
Schema s = new Schema(databases, captureDefaultCharset(), this.sensitivity);
try {
if ( isMariaDB() && mariaSupportsJSON()) {
detectMariaDBJSON(s);
for (String dbName : s.getDatabaseNames()) {
detectMariaDBJSON(s, dbName);
}
}
} catch ( InvalidSchemaError e ) {
e.printStackTrace();
Expand Down Expand Up @@ -315,38 +317,40 @@ public void close() throws SQLException {
}
}

private void detectMariaDBJSON(Schema schema) throws SQLException, InvalidSchemaError {
private void detectMariaDBJSON(Schema schema, String dbName) throws SQLException, InvalidSchemaError {
String checkConstraintSQL = "SELECT CONSTRAINT_SCHEMA, TABLE_NAME, CONSTRAINT_NAME, CHECK_CLAUSE " +
"from INFORMATION_SCHEMA.CHECK_CONSTRAINTS " +
"where CHECK_CLAUSE LIKE 'json_valid(%)'";
"where CONSTRAINT_SCHEMA = ? AND CHECK_CLAUSE LIKE 'json_valid(%)'";

String regex = "json_valid\\(`(.*)`\\)";
Pattern pattern = Pattern.compile(regex);

try (
PreparedStatement statement = connection.prepareStatement(checkConstraintSQL);
ResultSet rs = statement.executeQuery()
) {
while ( rs.next() ) {
String checkClause = rs.getString("CHECK_CLAUSE");
Matcher m = pattern.matcher(checkClause);
if ( m.find() ) {
String column = m.group(1);
Database d = schema.findDatabase(rs.getString("CONSTRAINT_SCHEMA"));
if ( d == null ) continue;
Table t = d.findTable(rs.getString("TABLE_NAME"));
if ( t == null ) continue;
short i = t.findColumnIndex(column);
if ( i < 0 ) continue;

ColumnDef cd = t.findColumn(i);
if ( cd instanceof StringColumnDef ) {
t.replaceColumn(i, JsonColumnDef.create(cd.getName(), "json", i));
PreparedStatement statement = connection.prepareStatement(checkConstraintSQL)) {
statement.setString(1, dbName);

try (
ResultSet rs = statement.executeQuery()) {
while ( rs.next() ) {
String checkClause = rs.getString("CHECK_CLAUSE");
Matcher m = pattern.matcher(checkClause);
if ( m.find() ) {
String column = m.group(1);
Database d = schema.findDatabase(rs.getString("CONSTRAINT_SCHEMA"));
if ( d == null ) continue;
Table t = d.findTable(rs.getString("TABLE_NAME"));
if ( t == null ) continue;
short i = t.findColumnIndex(column);
if ( i < 0 ) continue;

ColumnDef cd = t.findColumn(i);
if ( cd instanceof StringColumnDef ) {
t.replaceColumn(i, JsonColumnDef.create(cd.getName(), "json", i));
}
}
}
}
}

}

}

0 comments on commit 449e203

Please sign in to comment.