Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-14171: Honor catalog/schema field in UpdateDatabaseTable when executing ALTER table #9645

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ private synchronized OutputMetadataHolder checkAndUpdateTableSchema(final Connec
}

if (!columnsToAdd.isEmpty()) {
final List<String> alterTableSqlStatements = databaseAdapter.getAlterTableStatements(tableName, columnsToAdd, quoteTableName, quoteColumnNames);
final List<String> alterTableSqlStatements = databaseAdapter.getAlterTableStatements(tableSchema, columnsToAdd, quoteTableName, quoteColumnNames);

if (alterTableSqlStatements != null && !alterTableSqlStatements.isEmpty()) {
for (String alterTableSql : alterTableSqlStatements) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,14 @@ default String getCreateTableStatement(TableSchema tableSchema, boolean quoteTab
return createTableStatement.toString();
}

default List<String> getAlterTableStatements(TableSchema tableSchema, List<ColumnDescription> columnsToAdd, boolean quoteTableName, boolean quoteColumnNames) {
String tableName = generateTableName(quoteTableName, tableSchema.getCatalogName(), tableSchema.getSchemaName(), tableSchema.getTableName(), tableSchema);
// `quoteTableName` is always passed as false, since `tableName` is already quoted by `generateTableName` if needed. We don't want to quote twice.
return getAlterTableStatements(tableName, columnsToAdd, false, quoteColumnNames);
}

default List<String> getAlterTableStatements(String tableName, List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final boolean quoteColumnNames) {
StringBuilder createTableStatement = new StringBuilder();
StringBuilder alterTableStatement = new StringBuilder();

List<String> columnsAndDatatypes = new ArrayList<>(columnsToAdd.size());
for (ColumnDescription column : columnsToAdd) {
Expand All @@ -200,15 +206,15 @@ default List<String> getAlterTableStatements(String tableName, List<ColumnDescri
columnsAndDatatypes.add(sb.toString());
}

createTableStatement.append("ALTER TABLE ")
alterTableStatement.append("ALTER TABLE ")
.append(quoteTableName ? getTableQuoteString() : "")
.append(tableName)
.append(quoteTableName ? getTableQuoteString() : "")
.append(" ADD COLUMNS (")
.append(String.join(", ", columnsAndDatatypes))
.append(") ");

return List.of(createTableStatement.toString());
return List.of(alterTableStatement.toString());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,13 @@ public void setup() {
}

try (Statement s = service.getConnection().createStatement()) {
s.execute("DROP SCHEMA \"testSchema\"");
s.execute("DROP TABLE \"testSchema\".\"persons\"");
} catch (SQLException se) {
// Ignore, schema and/or table probably doesn't exist
}

try (Statement s = service.getConnection().createStatement()) {
s.execute("DROP SCHEMA \"testSchema\" restrict");
} catch (SQLException se) {
// Ignore, schema probably doesn't exist
}
Expand Down Expand Up @@ -493,6 +499,80 @@ public void testCreateTableNonDefaultSchema() throws Exception {
}
}

@Test
public void testAddColumnToExistingTableNonDefaultSchema() throws Exception {
runner = TestRunners.newTestRunner(processor);
try (final Connection conn = service.getConnection()) {
String defaultSchema = conn.getSchema();
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createSchema);
conn.setSchema("testSchema");
stmt.executeUpdate(createPersons);
} finally {
conn.setSchema(defaultSchema);
}

MockRecordParser readerFactory = new MockRecordParser();

readerFactory.addSchemaField(new RecordField("id", RecordFieldType.INT.getDataType(), false));
readerFactory.addSchemaField(new RecordField("name", RecordFieldType.STRING.getDataType(), true));
readerFactory.addSchemaField(new RecordField("code", RecordFieldType.INT.getDataType(), 0, true));
readerFactory.addSchemaField(new RecordField("newField", RecordFieldType.STRING.getDataType(), 0, true));
readerFactory.addRecord(1, "name1", null, "test");

runner.addControllerService("mock-reader-factory", readerFactory);
runner.enableControllerService(readerFactory);

runner.setProperty(UpdateDatabaseTable.RECORD_READER, "mock-reader-factory");
runner.setProperty(UpdateDatabaseTable.SCHEMA_NAME, "testSchema");
runner.setProperty(UpdateDatabaseTable.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateDatabaseTable.CREATE_TABLE, UpdateDatabaseTable.FAIL_IF_NOT_EXISTS);
runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER, "true");
runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS, "false");
runner.setProperty(UpdateDatabaseTable.DB_TYPE, new DerbyDatabaseAdapter().getName());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "persons");
runner.enqueue(new byte[0], attrs);
runner.run();

runner.assertTransferCount(UpdateDatabaseTable.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateDatabaseTable.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateDatabaseTable.ATTR_OUTPUT_TABLE, "persons");
// Verify the table has been updated with the expected field(s)
try (Statement s = conn.createStatement()) {
// The Derby equivalent of DESCRIBE TABLE (using a query rather than the ij tool)
ResultSet rs = s.executeQuery("SELECT * FROM SYS.SYSCOLUMNS WHERE referenceid = (SELECT tableid FROM SYS.SYSTABLES WHERE tablename = 'persons') ORDER BY columnnumber");
assertTrue(rs.next());
// Columns 2,3,4 are Column Name, Column Index, and Column Type
assertEquals("id", rs.getString(2));
assertEquals(1, rs.getInt(3));
// Primary key cannot be null, Derby stores that in this column
assertEquals("INTEGER NOT NULL", rs.getString(4));

assertTrue(rs.next());
assertEquals("name", rs.getString(2));
assertEquals(2, rs.getInt(3));
assertEquals("VARCHAR(100)", rs.getString(4));

assertTrue(rs.next());
assertEquals("code", rs.getString(2));
assertEquals(3, rs.getInt(3));
assertEquals("INTEGER", rs.getString(4));

assertTrue(rs.next());
assertEquals("NEWFIELD", rs.getString(2));
assertEquals(4, rs.getInt(3));
assertEquals("VARCHAR(100)", rs.getString(4));

// No more rows
assertFalse(rs.next());
}
}
}

/**
* Simple implementation only for testing purposes
Expand Down