Skip to content

Commit

Permalink
wrangler sql execution initial setup
Browse files Browse the repository at this point in the history
  • Loading branch information
harshdeeppruthi committed Dec 15, 2023
1 parent 5790e4b commit 9452e30
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 53 deletions.
12 changes: 11 additions & 1 deletion wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.cdap.wrangler.api;

import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.parser.UsageDefinition;

import java.util.List;
Expand Down Expand Up @@ -51,7 +53,8 @@
* }
* </code>
*/
public interface Directive extends Executor<List<Row>, List<Row>>, EntityMetrics {
public interface Directive extends Executor<List<Row>, List<Row>>, EntityMetrics,
DirectiveRelationalTransform {
/**
* This defines a interface variable that is static and final for specify
* the {@code type} of the plugin this interface would provide.
Expand Down Expand Up @@ -126,4 +129,11 @@ default List<EntityCountMetric> getCountMetrics() {
// no op
return null;
}

@Override
default Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
// no-op
return relation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler.api;

import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.LinearRelationalTransform;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;

/**
* {@link DirectiveRelationalTransform} provides relational transform support for
* wrangler directives.
*/
public interface DirectiveRelationalTransform extends LinearRelationalTransform {

/**
* Implementation of linear relational transform for each supported directive.
*
* @param relationalTranformContext transformation context with engine, input and output parameters
* @param relation input relation upon which the transformation is applied.
* @return transformed relation as the output relation. By default, returns an Invalid relation
* for unsupported directives.
*/
default Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
return new InvalidRelation("SQL execution for the directive is currently not supported.");
}

/**
* Indicates whether the directive is supported by relational transformation or not.
*
* @return boolean value for the directive SQL support.
* By default, returns false, indicating that the directive is currently not supported.
*/
default boolean isSQLSupported() {
return false;
}

}
16 changes: 16 additions & 0 deletions wrangler-core/src/main/java/io/cdap/directives/column/Drop.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand Down Expand Up @@ -88,4 +90,18 @@ public Mutation lineage() {
.drop(Many.of(columns))
.build();
}

@Override
public Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
for (String col: columns) {
relation = relation.dropColumn(col);
}
return relation;
}

@Override
public boolean isSQLSupported() {
return true;
}
}
136 changes: 106 additions & 30 deletions wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.cdap.cdap.etl.api.relational.Expression;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.LinearRelationalTransform;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType;
Expand All @@ -50,9 +49,11 @@
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveLoadException;
import io.cdap.wrangler.api.DirectiveParseException;
import io.cdap.wrangler.api.DirectiveRelationalTransform;
import io.cdap.wrangler.api.EntityCountMetric;
import io.cdap.wrangler.api.ErrorRecord;
import io.cdap.wrangler.api.ExecutorContext;
import io.cdap.wrangler.api.RecipeException;
import io.cdap.wrangler.api.RecipeParser;
import io.cdap.wrangler.api.RecipePipeline;
import io.cdap.wrangler.api.RecipeSymbol;
Expand Down Expand Up @@ -103,7 +104,8 @@
@Plugin(type = "transform")
@Name("Wrangler")
@Description("Wrangler - A interactive tool for data cleansing and transformation.")
public class Wrangler extends Transform<StructuredRecord, StructuredRecord> implements LinearRelationalTransform {
public class Wrangler extends Transform<StructuredRecord, StructuredRecord> implements DirectiveRelationalTransform {

private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class);

// Configuration specifying the dataprep application and service name.
Expand All @@ -124,6 +126,12 @@ public class Wrangler extends Transform<StructuredRecord, StructuredRecord> impl
private static final String PRECONDITION_LANGUAGE_JEXL = "jexl";
private static final String PRECONDITION_LANGUAGE_SQL = "sql";

// Sql execution value
private static final String SQL_ENABLED = "yes";

// wrangler sql execution mode enabled or not
public boolean isSqlEnabled = false;

// Plugin configuration.
private final Config config;

Expand Down Expand Up @@ -172,7 +180,7 @@ public void configurePipeline(PipelineConfigurer configurer) {
try {
Schema iSchema = configurer.getStageConfigurer().getInputSchema();
if (!config.containsMacro(Config.NAME_FIELD) && !(config.getField().equals("*")
|| config.getField().equals("#"))) {
|| config.getField().equals("#"))) {
validateInputSchema(iSchema, collector);
}

Expand All @@ -185,12 +193,22 @@ public void configurePipeline(PipelineConfigurer configurer) {
}
}

if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) {
isSqlEnabled = checkSQLExecutionEnabled(config);
if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)
|| !config.containsMacro(Config.NAME_SQL_EXECUTION)) {
if (!config.containsMacro(Config.NAME_SQL_EXECUTION)) {
if (isSqlEnabled && PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())) {
collector.addFailure("JEXL Precondition is not supported when SQL execution is enabled.", null);
}
if (!isSqlEnabled && PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) {
collector.addFailure("SQL Precondition is only supported when SQL execution is enabled.", null);
}
}
if (isSqlEnabled || PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) {
if (!config.containsMacro(Config.NAME_PRECONDITION_SQL)) {
validatePrecondition(config.getPreconditionSQL(), true, collector);
}
validateSQLModeDirectives(collector);
validateSQLUDDs(collector);
} else {
if (!config.containsMacro(Config.NAME_PRECONDITION)) {
validatePrecondition(config.getPreconditionJEXL(), false, collector);
Expand Down Expand Up @@ -248,6 +266,12 @@ public void configurePipeline(PipelineConfigurer configurer) {
collector.addFailure(e.getMessage(), null);
}

// If SQL Execution is enabled, check that the directives supports sql execution
if (isSqlEnabled) {
List<Directive> sqlDirectives = getDirectivesList(config);
validateSQLModeDirectives(collector, sqlDirectives);
}

// Based on the configuration create output schema.
try {
if (!config.containsMacro(Config.NAME_SCHEMA)) {
Expand All @@ -259,7 +283,7 @@ public void configurePipeline(PipelineConfigurer configurer) {
}

// Check if jexl pre-condition is not null or empty and if so compile expression.
if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (!isSqlEnabled && !config.containsMacro(Config.NAME_PRECONDITION)) {
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
&& checkPreconditionNotEmpty(false)) {
try {
Expand Down Expand Up @@ -352,7 +376,7 @@ public void initialize(TransformContext context) throws Exception {
}

// Check if jexl pre-condition is not null or empty and if so compile expression.
if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (!isSqlEnabled && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
&& checkPreconditionNotEmpty(false)) {
try {
Expand Down Expand Up @@ -411,7 +435,7 @@ public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter)
}

// If pre-condition is set, then evaluate the precondition
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
if (!isSqlEnabled && PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
&& checkPreconditionNotEmpty(false)) {
boolean skip = condition.apply(row);
if (skip) {
Expand Down Expand Up @@ -520,12 +544,17 @@ private void validatePrecondition(String precondition, Boolean isConditionSQL, F
}
}

private void validateSQLModeDirectives(FailureCollector collector) {
if (!Strings.isNullOrEmpty(config.getDirectives())) {
collector.addFailure("Directives are not supported for precondition of type SQL", null)
.withConfigProperty(Config.NAME_DIRECTIVES);
private void validateSQLModeDirectives(FailureCollector collector, List<Directive> directives) {
for (Directive directive : directives) {
if (!directive.isSQLSupported()) {
collector.addFailure(String.format("%s directive is not supported by SQL execution.",
directive.define().getDirectiveName()), null)
.withConfigProperty(Config.NAME_DIRECTIVES);
}
}
}

private void validateSQLUDDs(FailureCollector collector) {
if (!Strings.isNullOrEmpty(config.getUDDs())) {
collector.addFailure("UDDs are not supported for precondition of type SQL", null)
.withConfigProperty(Config.NAME_UDD);
Expand All @@ -544,6 +573,23 @@ private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) {
return false;
}

private boolean checkSQLExecutionEnabled(Config config) {
if (!Feature.WRANGLER_EXECUTION_SQL.isEnabled(getContext())) {
return false;
}

return SQL_ENABLED.equalsIgnoreCase(config.getSqlExecution());
}

List<Directive> getDirectivesList(Config config) throws DirectiveParseException, RecipeException {
String recipe = config.getDirectives();
List<Directive> directives = null;
GrammarBasedParser parser = new GrammarBasedParser("default",
new MigrateToV2(recipe).migrate(), registry);
directives = parser.parse();
return directives;
}

/**
* This method creates a {@link CompositeDirectiveRegistry} and initializes the {@link RecipeParser}
* with {@link NoOpDirectiveContext}
Expand All @@ -569,23 +615,46 @@ private RecipeParser getRecipeParser(StageContext context)

@Override
public Relation transform(RelationalTranformContext relationalTranformContext, Relation relation) {
if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())
&& checkPreconditionNotEmpty(true)) {
isSqlEnabled = checkSQLExecutionEnabled(config);
if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)
&& !Feature.WRANGLER_EXECUTION_SQL.isEnabled(relationalTranformContext)) {
return new InvalidRelation("Plugin is not configured for relational transformation");
}

if (Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) {
if ((isSqlEnabled || PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage()))
&& checkPreconditionNotEmpty(true)) {
Optional<ExpressionFactory<String>> expressionFactory = getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
}
Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL());
relation = relation.filter(filterExpression);
}
}

if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) {
throw new RuntimeException("SQL Precondition feature is not available");
if (isSqlEnabled) {
registry = SystemDirectiveRegistry.INSTANCE;
try {
registry.reload("default");
} catch (DirectiveLoadException e) {
throw new RuntimeException(e);
}

Optional<ExpressionFactory<String>> expressionFactory = getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
List<Directive> directives = null;
try {
directives = getDirectivesList(config);
} catch (DirectiveParseException | RecipeException e) {
throw new RuntimeException(e);
}

Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL());
return relation.filter(filterExpression);
for (Directive directive : directives) {
relation = directive
.transform(relationalTranformContext, relation);
}
}

return new InvalidRelation("Plugin is not configured for relational transformation");
return relation;
}

private Optional<ExpressionFactory<String>> getExpressionFactory(RelationalTranformContext ctx) {
Expand Down Expand Up @@ -637,6 +706,8 @@ private Map<String, String> getEntityMetricTags(EntityCountMetric metricDef) {
* Config for the plugin.
*/
public static class Config extends PluginConfig {

static final String NAME_SQL_EXECUTION = "sqlExecution";
static final String NAME_PRECONDITION = "precondition";
static final String NAME_PRECONDITION_SQL = "preconditionSQL";
static final String NAME_PRECONDITION_LANGUAGE = "expressionLanguage";
Expand All @@ -646,6 +717,11 @@ public static class Config extends PluginConfig {
static final String NAME_SCHEMA = "schema";
static final String NAME_ON_ERROR = "on-error";

@Name(NAME_SQL_EXECUTION)
@Description("Toggle to configure execution language between JEXL and SQL")
@Macro
@Nullable
private String sqlExecution;
@Name(NAME_PRECONDITION_LANGUAGE)
@Description("Toggle to configure precondition language between JEXL and SQL")
@Macro
Expand Down Expand Up @@ -693,8 +769,9 @@ public static class Config extends PluginConfig {
@Nullable
private final String onError;

public Config(String preconditionLanguage, String precondition, String directives, String udds,
String field, String schema, String onError) {
public Config(String sqlExecution, String preconditionLanguage, String precondition, String directives, String udds,
String field, String schema, String onError) {
this.sqlExecution = sqlExecution;
this.preconditionLanguage = preconditionLanguage;
this.precondition = precondition;
this.directives = directives;
Expand All @@ -713,13 +790,13 @@ public String getOnError() {
}

public String getPreconditionLanguage() {
if (Strings.isNullOrEmpty(preconditionLanguage)) {
// due to backward compatibility...
return PRECONDITION_LANGUAGE_JEXL;
}
return preconditionLanguage;
}

public String getSqlExecution() {
return sqlExecution;
}

public String getPreconditionJEXL() {
return precondition;
}
Expand All @@ -741,4 +818,3 @@ public String getUDDs() {
}
}
}

Loading

0 comments on commit 9452e30

Please sign in to comment.