Skip to content

Commit

Permalink
NIFI-14129 Added Database Dialect Service
Browse files Browse the repository at this point in the history
- Added database-dialect-service-api
- Added Standard Database Dialect Service implementation
- Added Database Adapter implementation
- Added Database Dialect Service property descriptor to Database Processors
- Refactored Database Processors with optional Database Dialect Service
  • Loading branch information
exceptionfactory committed Jan 16, 2025
1 parent f4b2f04 commit ebbfffb
Show file tree
Hide file tree
Showing 54 changed files with 2,296 additions and 865 deletions.
6 changes: 6 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,12 @@ language governing permissions and limitations under the License. -->
<version>2.2.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-nar</artifactId>
<version>2.2.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-client-service-api-nar</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-provider-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,24 @@
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
import org.apache.nifi.database.dialect.service.api.StandardColumnDefinition;
import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
import org.apache.nifi.database.dialect.service.api.QueryClause;
import org.apache.nifi.database.dialect.service.api.QueryClauseType;
import org.apache.nifi.database.dialect.service.api.QueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StandardQueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StatementResponse;
import org.apache.nifi.database.dialect.service.api.StatementType;
import org.apache.nifi.database.dialect.service.api.TableDefinition;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.processors.standard.db.DatabaseAdapterDescriptor;
import org.apache.nifi.processors.standard.db.impl.DatabaseAdapterDatabaseDialectService;
import org.apache.nifi.processors.standard.db.impl.DatabaseDialectServiceDatabaseAdapter;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -38,37 +50,17 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Optional;
import java.util.stream.Collectors;

@Tags({"database", "dbcp", "sql"})
@CapabilityDescription("Fetches parameters from database tables")

public class DatabaseParameterProvider extends AbstractParameterProvider implements VerifiableParameterProvider {

protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();

public static final PropertyDescriptor DB_TYPE;

static {
// Load the DatabaseAdapters
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
dbAdapterLoader.forEach(it -> {
dbAdapters.put(it.getName(), it);
dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(), it.getDescription()));
});

DB_TYPE = new PropertyDescriptor.Builder()
.name("db-type")
.displayName("Database Type")
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
.allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()]))
.defaultValue("Generic")
.required(true)
.build();
}
public static final PropertyDescriptor DB_TYPE = DatabaseAdapterDescriptor.getDatabaseTypeDescriptor("db-type");

public static final PropertyDescriptor DATABASE_DIALECT_SERVICE = DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);

static AllowableValue GROUPING_BY_COLUMN = new AllowableValue("grouping-by-column", "Column",
"A single table is partitioned by the 'Parameter Group Name Column'. All rows with the same value in this column will " +
Expand Down Expand Up @@ -149,6 +141,7 @@ public class DatabaseParameterProvider extends AbstractParameterProvider impleme
protected void init(final ParameterProviderInitializationContext config) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DB_TYPE);
properties.add(DATABASE_DIALECT_SERVICE);
properties.add(DBCP_SERVICE);
properties.add(PARAMETER_GROUPING_STRATEGY);
properties.add(TABLE_NAME);
Expand Down Expand Up @@ -233,8 +226,22 @@ private void validateValueNotNull(final String value, final String columnName) {
}

String getQuery(final ConfigurationContext context, final String tableName, final List<String> columns, final String whereClause) {
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
return dbAdapter.getSelectStatement(tableName, StringUtils.join(columns, ", "), whereClause, null, null, null);
final DatabaseDialectService databaseDialectService = getDatabaseDialectService(context);

final List<ColumnDefinition> columnDefinitions = columns.stream()
.map(StandardColumnDefinition::new)
.map(ColumnDefinition.class::cast)
.toList();
final TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, columnDefinitions);
final QueryStatementRequest queryStatementRequest = new StandardQueryStatementRequest(
StatementType.SELECT,
tableDefinition,
Optional.empty(),
List.of(new QueryClause(QueryClauseType.WHERE, whereClause)),
Optional.empty()
);
final StatementResponse statementResponse = databaseDialectService.getStatement(queryStatementRequest);
return statementResponse.sql();
}

@Override
Expand Down Expand Up @@ -262,4 +269,15 @@ public List<ConfigVerificationResult> verify(final ConfigurationContext context,

return results;
}

private DatabaseDialectService getDatabaseDialectService(final PropertyContext context) {
final DatabaseDialectService databaseDialectService;
final String databaseType = context.getProperty(DB_TYPE).getValue();
if (DatabaseDialectServiceDatabaseAdapter.NAME.equals(databaseType)) {
databaseDialectService = context.getProperty(DATABASE_DIALECT_SERVICE).asControllerService(DatabaseDialectService.class);
} else {
databaseDialectService = new DatabaseAdapterDatabaseDialectService(databaseType);
}
return databaseDialectService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@
<artifactId>nifi-dbcp-service-api</artifactId>
<version>2.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-api</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
Expand Down
Loading

0 comments on commit ebbfffb

Please sign in to comment.