diff --git a/src/main/java/com/zendesk/maxwell/schema/SchemaCapturer.java b/src/main/java/com/zendesk/maxwell/schema/SchemaCapturer.java index b9233c828..3958b2e07 100644 --- a/src/main/java/com/zendesk/maxwell/schema/SchemaCapturer.java +++ b/src/main/java/com/zendesk/maxwell/schema/SchemaCapturer.java @@ -24,8 +24,7 @@ public class SchemaCapturer implements AutoCloseable { static final Logger LOGGER = LoggerFactory.getLogger(SchemaCapturer.class); public static final HashSet IGNORED_DATABASES = new HashSet<>( - Arrays.asList(new String[]{"performance_schema", "information_schema"}) - ); + Arrays.asList(new String[] { "performance_schema", "information_schema" })); private final Set includeDatabases; private final Set includeTables; @@ -36,12 +35,12 @@ public class SchemaCapturer implements AutoCloseable { private final PreparedStatement columnPreparedStatement; private final PreparedStatement pkPreparedStatement; - public SchemaCapturer(Connection c, CaseSensitivity sensitivity) throws SQLException { this(c, sensitivity, Collections.emptySet(), Collections.emptySet()); } - SchemaCapturer(Connection c, CaseSensitivity sensitivity, Set includeDatabases, Set includeTables) throws SQLException { + SchemaCapturer(Connection c, CaseSensitivity sensitivity, Set includeDatabases, Set includeTables) + throws SQLException { this.includeDatabases = includeDatabases; this.includeTables = includeTables; this.connection = c; @@ -99,12 +98,10 @@ public Schema capture() throws SQLException { LOGGER.debug("Capturing schemas..."); ArrayList databases = new ArrayList<>(); - String dbCaptureQuery = - "SELECT SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME FROM INFORMATION_SCHEMA.SCHEMATA"; + String dbCaptureQuery = "SELECT SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME FROM INFORMATION_SCHEMA.SCHEMATA"; - if ( includeDatabases.size() > 0 ) { - dbCaptureQuery += - " WHERE SCHEMA_NAME IN " + Sql.inListSQL(includeDatabases.size()); + if (includeDatabases.size() > 0) { + dbCaptureQuery += " WHERE SCHEMA_NAME IN " + Sql.inListSQL(includeDatabases.size()); } dbCaptureQuery += " ORDER BY SCHEMA_NAME"; @@ -135,13 +132,14 @@ public Schema capture() throws SQLException { } LOGGER.debug("{} database schemas captured!", size); - Schema s = new Schema(databases, captureDefaultCharset(), this.sensitivity); try { - if ( isMariaDB() && mariaSupportsJSON()) { - detectMariaDBJSON(s); + if (isMariaDB() && mariaSupportsJSON()) { + for (String dbName : s.getDatabaseNames()) { + detectMariaDBJSON(s, dbName); + } } - } catch ( InvalidSchemaError e ) { + } catch (InvalidSchemaError e) { e.printStackTrace(); } return s; @@ -150,13 +148,12 @@ public Schema capture() throws SQLException { private String captureDefaultCharset() throws SQLException { LOGGER.debug("Capturing Default Charset"); try (Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("select @@character_set_server")) { + ResultSet rs = stmt.executeQuery("select @@character_set_server")) { rs.next(); return rs.getString("@@character_set_server"); } } - private void captureDatabase(Database db) throws SQLException { tablePreparedStatement.setString(1, db.getName()); Sql.prepareInList(tablePreparedStatement, 2, includeTables); @@ -173,9 +170,8 @@ private void captureDatabase(Database db) throws SQLException { captureTables(db, tables); } - private boolean isMySQLAtLeast56() throws SQLException { - if ( isMariaDB() ) + if (isMariaDB()) return true; DatabaseMetaData meta = connection.getMetaData(); @@ -190,13 +186,14 @@ private boolean isMariaDB() throws SQLException { } static final String MARIA_VERSION_REGEX = "[\\d\\.]+-(\\d+)\\.(\\d+)"; + private boolean mariaSupportsJSON() throws SQLException { DatabaseMetaData meta = connection.getMetaData(); String versionString = meta.getDatabaseProductVersion(); Pattern pattern = Pattern.compile(MARIA_VERSION_REGEX); Matcher m = pattern.matcher(versionString); - if ( m.find() ) { + if (m.find()) { int major = Integer.parseInt(m.group(1)); int minor = Integer.parseInt(m.group(2)); @@ -267,7 +264,7 @@ private void captureTablesPK(Database db, HashMap tables) throws String columnName = rs.getString("COLUMN_NAME"); ArrayList pkList = tablePKMap.get(tableName); - if ( pkList != null ) + if (pkList != null) pkList.add(ordinalPosition - 1, columnName); } } @@ -294,7 +291,7 @@ static String[] extractEnumValues(String expandedType) { Matcher enumMatcher = pattern.matcher(enumValues); List result = new ArrayList<>(); - while(enumMatcher.find()) { + while (enumMatcher.find()) { String value = enumMatcher.group(0); if (value.startsWith("'")) value = value.substring(1); @@ -309,44 +306,48 @@ static String[] extractEnumValues(String expandedType) { @Override public void close() throws SQLException { try (PreparedStatement p1 = tablePreparedStatement; - PreparedStatement p2 = columnPreparedStatement; - PreparedStatement p3 = pkPreparedStatement) { + PreparedStatement p2 = columnPreparedStatement; + PreparedStatement p3 = pkPreparedStatement) { // auto-close shared prepared statements } } - private void detectMariaDBJSON(Schema schema) throws SQLException, InvalidSchemaError { + private void detectMariaDBJSON(Schema schema, String dbName) throws SQLException, InvalidSchemaError { String checkConstraintSQL = "SELECT CONSTRAINT_SCHEMA, TABLE_NAME, CONSTRAINT_NAME, CHECK_CLAUSE " + - "from INFORMATION_SCHEMA.CHECK_CONSTRAINTS " + - "where CHECK_CLAUSE LIKE 'json_valid(%)'"; + "from INFORMATION_SCHEMA.CHECK_CONSTRAINTS " + + "where CONSTRAINT_SCHEMA = ? AND CHECK_CLAUSE LIKE 'json_valid(%)'"; String regex = "json_valid\\(`(.*)`\\)"; Pattern pattern = Pattern.compile(regex); try ( - PreparedStatement statement = connection.prepareStatement(checkConstraintSQL); - ResultSet rs = statement.executeQuery() - ) { - while ( rs.next() ) { - String checkClause = rs.getString("CHECK_CLAUSE"); - Matcher m = pattern.matcher(checkClause); - if ( m.find() ) { - String column = m.group(1); - Database d = schema.findDatabase(rs.getString("CONSTRAINT_SCHEMA")); - if ( d == null ) continue; - Table t = d.findTable(rs.getString("TABLE_NAME")); - if ( t == null ) continue; - short i = t.findColumnIndex(column); - if ( i < 0 ) continue; - - ColumnDef cd = t.findColumn(i); - if ( cd instanceof StringColumnDef ) { - t.replaceColumn(i, JsonColumnDef.create(cd.getName(), "json", i)); + PreparedStatement statement = connection.prepareStatement(checkConstraintSQL)) { + statement.setString(1, dbName); + + try (ResultSet rs = statement.executeQuery()) { + while (rs.next()) { + String checkClause = rs.getString("CHECK_CLAUSE"); + Matcher m = pattern.matcher(checkClause); + if (m.find()) { + String column = m.group(1); + Database d = schema.findDatabase(rs.getString("CONSTRAINT_SCHEMA")); + if (d == null) + continue; + Table t = d.findTable(rs.getString("TABLE_NAME")); + if (t == null) + continue; + short i = t.findColumnIndex(column); + if (i < 0) + continue; + + ColumnDef cd = t.findColumn(i); + if (cd instanceof StringColumnDef) { + t.replaceColumn(i, JsonColumnDef.create(cd.getName(), "json", i)); + } } } } } - } }