Skip to content

Commit

Permalink
Merge pull request #105 from dsyzhu/app
Browse files Browse the repository at this point in the history
Add PlanProgramCompileOptions so that customers can choose different strategies to fit their usage
  • Loading branch information
dsyzhu authored Apr 18, 2020
2 parents f9bd069 + f4d3e20 commit cca9ed8
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
package com.yahoo.yqlplus.engine.guice;

import com.google.inject.AbstractModule;
import com.google.inject.Key;
import com.google.inject.multibindings.OptionalBinder;
import com.yahoo.yqlplus.engine.ProgramCompiler;
import com.yahoo.yqlplus.engine.internal.bytecode.ASMClassSourceModule;
import com.yahoo.yqlplus.engine.internal.compiler.PlanProgramCompiler;
import com.yahoo.yqlplus.engine.internal.compiler.streams.PlanProgramCompileOptions;
import com.yahoo.yqlplus.engine.internal.plan.PlanScopedModule;

public class PlannerCompilerModule extends AbstractModule {
Expand All @@ -19,6 +22,7 @@ protected void configure() {
bind(ProgramCompiler.class).to(PlanProgramCompiler.class);
install(new PlanScopedModule());
install(new ASMClassSourceModule());
OptionalBinder.newOptionalBinder(binder(), Key.get(PlanProgramCompileOptions.class))
.setDefault().toInstance(PlanProgramCompileOptions.DEFAULT_OPTIONS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.yahoo.yqlplus.engine.internal.bytecode.types.gambit.GambitScope;
import com.yahoo.yqlplus.engine.internal.bytecode.types.gambit.GambitSource;
import com.yahoo.yqlplus.engine.internal.bytecode.types.gambit.ScopedBuilder;
import com.yahoo.yqlplus.engine.internal.compiler.streams.PlanProgramCompileOptions;
import com.yahoo.yqlplus.engine.internal.generate.ProgramInvocation;
import com.yahoo.yqlplus.engine.internal.plan.ModuleNamespace;
import com.yahoo.yqlplus.engine.internal.plan.ProgramPlanner;
Expand Down Expand Up @@ -51,10 +52,14 @@ static class CompilerInstance {
private final ProgramPlanner planner;

@Inject
CompilerInstance(ASMClassSource classSource, Injector injector, LogicalTransforms transforms, SourceNamespace sourceNamespace, ModuleNamespace moduleNamespace, ViewRegistry viewNamespace) {
CompilerInstance(ASMClassSource classSource, Injector injector, LogicalTransforms transforms, SourceNamespace sourceNamespace, ModuleNamespace moduleNamespace, ViewRegistry viewNamespace, PlanProgramCompileOptions planProgramCompileOptions) {
this.classSource = classSource;
this.gambitScope = new GambitSource(classSource);
this.planner = new ProgramPlanner(transforms, sourceNamespace, moduleNamespace, gambitScope, viewNamespace);
if (planProgramCompileOptions != null) {
this.planner = new ProgramPlanner(transforms, sourceNamespace, moduleNamespace, gambitScope, viewNamespace, planProgramCompileOptions);
} else {
this.planner = new ProgramPlanner(transforms, sourceNamespace, moduleNamespace, gambitScope, viewNamespace);
}
this.injector = injector;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.yahoo.yqlplus.engine.internal.compiler.streams;

/**
* Created by daisyzhu on 4/17/20.
*/
public class PlanProgramCompileOptions {

public static final PlanProgramCompileOptions DEFAULT_OPTIONS = new PlanProgramCompileOptions.PlanProgramOptionsBuilder().build();

private boolean keepMergeSequential; //enable this option merge tables will be sequential

private PlanProgramCompileOptions(PlanProgramOptionsBuilder builder) {
this.keepMergeSequential = builder.keepMergeSequential;
}

public boolean isKeepMergeSequential() {
return keepMergeSequential;
}

public static final class PlanProgramOptionsBuilder {
private boolean keepMergeSequential;
public PlanProgramOptionsBuilder keepMergeSequential(boolean keepMergeSequential) {
this.keepMergeSequential = keepMergeSequential;
return this;
}

public PlanProgramCompileOptions build() {
return new PlanProgramCompileOptions(this);
}
}

@Override
public String toString() {
return "PlanProgramCompileOptions{" +
"keepMergeSequential=" + keepMergeSequential +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.yahoo.yqlplus.engine.internal.compiler.streams.PlanProgramCompileOptions;
import com.yahoo.yqlplus.engine.internal.plan.ast.OperatorStep;
import com.yahoo.yqlplus.engine.internal.plan.ast.OperatorValue;
import com.yahoo.yqlplus.engine.internal.tasks.*;
Expand Down Expand Up @@ -162,8 +163,12 @@ OperatorNode<TaskOperator> fork(ForkTask task) {
}

public OperatorNode<TaskOperator> planTask(List<OperatorNode<TaskOperator>> arguments, Step root) {
return planTask(arguments, root, null);
}

public OperatorNode<TaskOperator> planTask(List<OperatorNode<TaskOperator>> arguments, Step root, PlanProgramCompileOptions planProgramCompileOptions) {
GraphPlanner graphPlanner = new GraphPlanner();
ForkTask fork = graphPlanner.plan(root);
ForkTask fork = graphPlanner.plan(root, planProgramCompileOptions);
PlanState state = new PlanState();
OperatorNode<TaskOperator> start = state.fork(fork);
return OperatorNode.create(TaskOperator.PLAN, start, arguments, new Sorter(state.outputMap, state.tasks).sort(start));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.yahoo.yqlplus.engine.api.DependencyNotFoundException;
import com.yahoo.yqlplus.engine.api.ViewRegistry;
import com.yahoo.yqlplus.engine.internal.bytecode.types.gambit.GambitScope;
import com.yahoo.yqlplus.engine.internal.compiler.streams.PlanProgramCompileOptions;
import com.yahoo.yqlplus.engine.internal.plan.ast.OperatorStep;
import com.yahoo.yqlplus.engine.internal.plan.ast.OperatorValue;
import com.yahoo.yqlplus.engine.internal.plan.ast.PhysicalExprOperator;
Expand Down Expand Up @@ -73,11 +74,16 @@ public class ProgramPlanner implements ViewRegistry {
private final GambitScope adapter;
private final EnumSet<CompiledProgram.ProgramStatement> writeStatements = EnumSet.noneOf(CompiledProgram.ProgramStatement.class);
private final ViewRegistry parentViews;
private final PlanProgramCompileOptions planProgramCompileOptions;

private final Map<String, SourceType> resolvedSources = Maps.newHashMap();
private final Map<String, ModuleType> resolvedModules = Maps.newHashMap();

public ProgramPlanner(LogicalTransforms transforms, SourceNamespace sourceNamespace, ModuleNamespace moduleNamespace, GambitScope gambitScope, ViewRegistry viewNamespace) {
this(transforms, sourceNamespace, moduleNamespace, gambitScope, viewNamespace, null);
}

public ProgramPlanner(LogicalTransforms transforms, SourceNamespace sourceNamespace, ModuleNamespace moduleNamespace, GambitScope gambitScope, ViewRegistry viewNamespace, PlanProgramCompileOptions planProgramCompileOptions) {
this.logicalTransforms = transforms;
this.sourceNamespace = sourceNamespace;
this.moduleNamespace = moduleNamespace;
Expand All @@ -86,6 +92,7 @@ public ProgramPlanner(LogicalTransforms transforms, SourceNamespace sourceNamesp
this.views = Maps.newHashMap();
this.adapter = gambitScope;
this.parentViews = viewNamespace;
this.planProgramCompileOptions = planProgramCompileOptions;
}

public SourceType findSource(ContextPlanner contextPlanner, OperatorNode<SequenceOperator> source) {
Expand Down Expand Up @@ -234,7 +241,7 @@ public OperatorNode<TaskOperator> plan(OperatorNode<StatementOperator> program)
}
OperatorValue end = OperatorStep.create(getValueTypeAdapter(), PhysicalOperator.END, terminals);
PlanTree tree = new PlanTree();
OperatorNode<TaskOperator> x = tree.planTask(arguments, end.getSource());
OperatorNode<TaskOperator> x = tree.planTask(arguments, end.getSource(), planProgramCompileOptions);
x.putAnnotation("yql.writeStatements", writeStatements);
return x;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
package com.yahoo.yqlplus.engine.internal.tasks;

import com.google.common.collect.*;
import com.yahoo.yqlplus.engine.internal.compiler.streams.PlanProgramCompileOptions;
import com.yahoo.yqlplus.engine.internal.plan.ast.OperatorStep;
import com.yahoo.yqlplus.engine.internal.plan.ast.PhysicalOperator;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.yahoo.rdl.BaseType.Map;

/**
* Plan the execution of steps as a graph of Tasks.
* <p/>
Expand Down Expand Up @@ -70,10 +69,15 @@ private void populateAvailable(Node node, Set<Value> available, Map<Step, Node>
}
}

public ForkTask plan(Step root) {
return plan(root, null);
}

/**
* Plan a graph of tasks using the terminal step as a starting point. Discover all of the used steps from those roots, and then return the starting task.
*/
public ForkTask plan(Step root) {

public ForkTask plan(Step root, PlanProgramCompileOptions planProgramCompileOptions) {
Map<Step, Node> nodes = Maps.newIdentityHashMap();

discover(root, nodes);
Expand Down Expand Up @@ -119,19 +123,23 @@ public ForkTask plan(Step root) {
target.available.addAll(node.available);
nodes.remove(key);

//check if all non-END nodes only have one todo
boolean found = false;
for (Map.Entry<Step, Node> entry : nodes.entrySet()) {
if (entry.getKey() instanceof OperatorStep) {
if (!((OperatorStep) entry.getKey()).getCompute().getOperator().equals(PhysicalOperator.END)) {
if (!(entry.getValue().todo.size() == 1)) {
found = true;
break;
if (planProgramCompileOptions != null && !planProgramCompileOptions.isKeepMergeSequential()) {
//check if all non-END nodes only have one todo
boolean found = false;
for (Map.Entry<Step, Node> entry : nodes.entrySet()) {
if (entry.getKey() instanceof OperatorStep) {
if (!((OperatorStep) entry.getKey()).getCompute().getOperator().equals(PhysicalOperator.END)) {
if (!(entry.getValue().todo.size() == 1)) {
found = true;
break;
}
}
}
}
modified = found;
} else {
modified = true;
}
modified = found;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.multibindings.OptionalBinder;
import com.google.inject.name.Names;
import com.yahoo.cloud.metrics.api.MetricDimension;
import com.yahoo.cloud.metrics.api.MetricType;
Expand All @@ -38,6 +39,7 @@
import com.yahoo.yqlplus.engine.YQLResultSet;
import com.yahoo.yqlplus.engine.api.DependencyNotFoundException;
import com.yahoo.yqlplus.engine.api.Record;
import com.yahoo.yqlplus.engine.internal.compiler.streams.PlanProgramCompileOptions;
import com.yahoo.yqlplus.engine.java.JavaTestModule.MetricModule;
import com.yahoo.yqlplus.engine.scope.ExecutionScope;
import com.yahoo.yqlplus.engine.scope.MapExecutionScope;
Expand All @@ -53,6 +55,7 @@
import com.yahoo.yqlplus.engine.sources.ErrorSource;
import com.yahoo.yqlplus.engine.sources.ExecuteScopedInjectedSource;
import com.yahoo.yqlplus.engine.sources.FRSource;
import com.yahoo.yqlplus.engine.sources.GenericFieldResultSource;
import com.yahoo.yqlplus.engine.sources.InjectedArgumentSource;
import com.yahoo.yqlplus.engine.sources.InsertMovieSourceSingleField;
import com.yahoo.yqlplus.engine.sources.InsertSourceMissingSetAnnotation;
Expand Down Expand Up @@ -91,7 +94,6 @@
import com.yahoo.yqlplus.engine.sources.SampleListSourceWithBoxedParams;
import com.yahoo.yqlplus.engine.sources.SampleListSourceWithUnboxedParams;
import com.yahoo.yqlplus.engine.sources.SampleResultSource;
import com.yahoo.yqlplus.engine.sources.GenericFieldResultSource;
import com.yahoo.yqlplus.engine.sources.SingleIntegerKeySource;
import com.yahoo.yqlplus.engine.sources.SingleIntegerKeySourceWithSkipEmptyOrZeroSetToTrue;
import com.yahoo.yqlplus.engine.sources.SingleKeySource;
Expand Down Expand Up @@ -145,6 +147,30 @@ public void testPerformanceIssue() throws Exception {
assertFalse(samples.get(0).id.equals(samples.get(1).id));
assertTrue(Math.max(samples.get(0).start, samples.get(1).start) < Math.min(samples.get(0).end, samples.get(1).end));
}

@Test
public void testMergeSequentialOption() throws Exception {
Injector injector = Guice.createInjector(new JavaTestModule(), new SourceBindingModule("sample", SampleExecutionSource.class), new AbstractModule() {
@Override
protected void configure() {
OptionalBinder.newOptionalBinder(binder(), com.google.inject.Key.get(PlanProgramCompileOptions.class))
.setBinding().toInstance(new PlanProgramCompileOptions.PlanProgramOptionsBuilder().keepMergeSequential(true).build());
}
});
YQLPlusCompiler compiler = injector.getInstance(YQLPlusCompiler.class);
String programStr = "CREATE TEMP TABLE sample1 AS (SELECT * FROM sample('id1')); \n" +
"CREATE TEMP TABLE sample2 AS (SELECT * FROM sample('id2')); \n" +
"SELECT * FROM sample1 \n" +
"MERGE \n" +
"SELECT * FROM sample2 \n" +
"OUTPUT AS samples;";
CompiledProgram program = compiler.compile(programStr);
ProgramResult programResult = program.run(ImmutableMap.<String, Object>of(), true);
List<SampleExecutionSource.Sample> samples = programResult.getResult("samples").get().getResult();
assertEquals(2, samples.size());
assertFalse(samples.get(0).id.equals(samples.get(1).id));
assertTrue(samples.get(0).end <= samples.get(1).start || samples.get(1).end <= samples.get(0).start);
}

@Test
public void testGenericResult() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Names;
import com.yahoo.cloud.metrics.api.*;
import com.yahoo.cloud.metrics.api.MetricDimension;
import com.yahoo.cloud.metrics.api.MetricEvent;
import com.yahoo.cloud.metrics.api.MetricSink;
import com.yahoo.cloud.metrics.api.RequestEvent;
import com.yahoo.cloud.metrics.api.RequestMetricSink;
import com.yahoo.cloud.metrics.api.StandardRequestEmitter;
import com.yahoo.cloud.metrics.api.TaskMetricEmitter;
import com.yahoo.yqlplus.api.Source;
import com.yahoo.yqlplus.api.annotations.ExecuteScoped;
import com.yahoo.yqlplus.api.guice.SeededKeyProvider;
Expand Down

0 comments on commit cca9ed8

Please sign in to comment.