Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying out cascading 3.2 #70

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ limitations under the License.

<properties>
<!-- dependency versions -->
<cascading.version>3.1.0</cascading.version>
<flink.version>1.0.3</flink.version>
<cascading.version>3.2.0-wip-6</cascading.version>
<flink.version>1.1.3</flink.version>
<slf4j.version>1.7.7</slf4j.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -204,14 +204,7 @@ limitations under the License.
<type>jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.2</version>
<type>jar</type>
</dependency>


<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.dataartisans.flink.cascading.planner.rules.BoundaryAfterSourceTapTransformer;
import com.dataartisans.flink.cascading.planner.rules.DoubleBoundaryRemovalTransformer;
import com.dataartisans.flink.cascading.planner.rules.TopDownSplitBoundariesNodePartitioner;
import org.apache.flink.api.java.ExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -56,12 +57,20 @@ public class FlinkConnector extends FlowConnector {

List<String> classPath = new ArrayList<String>();

private ExecutionEnvironment env;

public FlinkConnector() {
this(new Properties());
}

public FlinkConnector(Map<Object, Object> properties) {
this(ExecutionEnvironment.getExecutionEnvironment(), properties);
}

public FlinkConnector(ExecutionEnvironment env, Map<Object, Object> properties) {

super(properties);
this.env = env;
}

@Override
Expand All @@ -71,7 +80,7 @@ protected Class<? extends Scheme> getDefaultIntermediateSchemeClass() {

@Override
protected FlowPlanner createFlowPlanner() {
return new FlinkPlanner(classPath);
return new FlinkPlanner(env, classPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.DataStatistics;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class FlinkFlowStepJob extends FlowStepJob<Configuration>

private final Configuration currentConf;

private Client client;
private ClusterClient client;

private JobID jobID;
private Throwable jobException;
Expand Down Expand Up @@ -169,7 +170,20 @@ else if (FlinkConfigConstants.EXECUTION_MODE_PIPELINED.equals(execMode)) {
org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, localCluster.hostname());

client = new Client(config);
final org.apache.flink.configuration.Configuration tmpConfig = localCluster.generateConfiguration(localCluster.configuration());

final int resourceManagerPort = tmpConfig.getInteger(
ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT);

final int jobManagerPort = tmpConfig
.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);

config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);

flowStep.logWarn("Using local cluster at " + localCluster.hostname() + " RM port: " + resourceManagerPort + " JM port: " + jobManagerPort);

client = new StandaloneClusterClient(config);
client.setPrintStatusDuringExecution(env.getConfig().isSysoutLoggingEnabled());

} else if (isRemoteExecution()) {
Expand Down Expand Up @@ -206,7 +220,7 @@ else if (FlinkConfigConstants.EXECUTION_MODE_PIPELINED.equals(execMode)) {
final Callable<JobSubmissionResult> callable = new Callable<JobSubmissionResult>() {
@Override
public JobSubmissionResult call() throws Exception {
return client.runBlocking(jobGraph, loader);
return client.run(jobGraph, loader);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,19 @@ public class FlinkPlanner extends FlowPlanner<FlinkFlow, Configuration> {

private List<String> classPath;

private ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
private ExecutionEnvironment env;

public FlinkPlanner(List<String> classPath) {
public FlinkPlanner(List<String> classPath) { this(ExecutionEnvironment.getExecutionEnvironment(), classPath); }

public FlinkPlanner(ExecutionEnvironment env, List<String> classPath) {
super();
this.env = env;
this.classPath = classPath;

env.getConfig().disableSysoutLogging();
if (env.getParallelism() <= 0) {
// load the default parallelism from config

GlobalConfiguration.loadConfiguration(new File(CliFrontend.getConfigurationDirectoryFromEnv()).getAbsolutePath());
org.apache.flink.configuration.Configuration configuration = GlobalConfiguration.getConfiguration();
int parallelism = configuration.getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public BoundaryInStage(FlowProcess flowProcess, FlowElement flowElement) {
}

@Override
public void receive(Duct previous, Void v) {
public void receive(Duct previous, int ordinal, Void v) {
throw new UnsupportedOperationException( "use run() instead" );
}

Expand Down Expand Up @@ -80,7 +80,7 @@ public void run(Object input) throws Throwable {
continue;
}

next.receive( this, tupleEntry );
next.receive( this, 0, tupleEntry );

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void setTupleCollector(Collector<Tuple> tupleCollector) {
}

@Override
public void receive(Duct previous, TupleEntry tupleEntry) {
public void receive(Duct previous, int ordinal, TupleEntry tupleEntry) {
this.tupleCollector.collect(tupleEntry.getTuple());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public void bind( StreamGraph streamGraph )
if( role != IORole.sink ) {
next = getNextFor(streamGraph);
}

if( role == IORole.sink ) {
setOrdinalMap(streamGraph);
}
}


Expand Down Expand Up @@ -81,7 +77,7 @@ public void start( Duct previous )
}
}

public void receive( Duct previous, TupleEntry incomingEntry ) {
public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) {
throw new UnsupportedOperationException("Receive not implemented for CoGroupBufferInGate.");
}

Expand All @@ -107,7 +103,7 @@ public void run(Object input) {

keyEntry.setTuple( this.closure.getGroupTuple(key) );

next.receive( this, grouping );
next.receive( this, 0, grouping );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ public void bind( StreamGraph streamGraph )
if( role != IORole.sink ) {
next = getNextFor(streamGraph);
}

if( role == IORole.sink ) {
setOrdinalMap(streamGraph);
}
}


Expand Down Expand Up @@ -97,7 +93,7 @@ public void start( Duct previous ) {
}
}

public void receive( Duct previous, TupleEntry incomingEntry ) {
public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) {
throw new UnsupportedOperationException("Receive not implemented for CoGroupInGate.");
}

Expand All @@ -118,7 +114,7 @@ public void run(Object input) {
tupleEntryIterator.reset(resultIterator);
keyEntry.setTuple( this.closure.getGroupTuple(null) );

next.receive( this, grouping );
next.receive( this, 0, grouping );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ public void bind( StreamGraph streamGraph ) {
if( role != IORole.sink ) {
next = getNextFor(streamGraph);
}

if( role == IORole.sink ) {
setOrdinalMap(streamGraph);
}
}


Expand Down Expand Up @@ -80,7 +76,7 @@ public void start( Duct previous ) {
}
}

public void receive( Duct previous, TupleEntry incomingEntry ) {
public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) {
throw new UnsupportedOperationException("Receive not implemented for GroupByInGate.");
}

Expand Down Expand Up @@ -110,7 +106,7 @@ public void run(Object input) {
Tuple groupTuple = keyPeekingIt.peekNextKey();
keyEntry.setTuple( groupTuple );

next.receive( this, grouping );
next.receive( this, 0, grouping );

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ public void prepare() {
}

@Override
public void receive(Duct previous, Tuple2<Tuple, Tuple[]> t) {
public void receive(Duct previous, int ordinal, Tuple2<Tuple, Tuple[]> t) {

closure.reset(t);

entryIterator.reset(joiner.getIterator(closure));

while(entryIterator.hasNext()) {
next.receive(this, entryIterator.next());
next.receive(this, ordinal, entryIterator.next());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public JoinBoundaryInStage(FlowProcess flowProcess, FlowElement flowElement) {
}

@Override
public void receive(Duct previous, Void v) {
public void receive(Duct previous, int ordinal, Void v) {
throw new UnsupportedOperationException( "use run() instead" );
}

Expand Down Expand Up @@ -65,6 +65,6 @@ public void run(Object input) throws Throwable {
flowProcess.increment( StepCounters.Tuples_Read, 1 );
flowProcess.increment( SliceCounters.Tuples_Read, 1 );

next.receive(this, joinInputTuples);
next.receive(this, 0, joinInputTuples);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public JoinBoundaryMapperInStage(FlowProcess flowProcess, FlowElement flowElemen
}

@Override
public void receive(Duct previous, Void v) {
public void receive(Duct previous, int ordinal, Void v) {
throw new UnsupportedOperationException( "use run() instead" );
}

Expand Down Expand Up @@ -73,7 +73,7 @@ public void run(Object input) throws Throwable {
continue;
}

next.receive( this, joinListTuple );
next.receive( this, 0, joinListTuple );

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public SinkBoundaryInStage(FlowProcess flowProcess, FlowElement flowElement, Flo
}

@Override
public void receive(Duct previous, Void v) {
public void receive(Duct previous, int ordinal, Void v) {
throw new UnsupportedOperationException( "use run() instead" );
}

Expand Down Expand Up @@ -82,7 +82,7 @@ public void run(Object input) throws Throwable {
handleException(new DuctException("internal error", throwable), null);
}

next.receive( this, tupleEntry );
next.receive( this, 0, tupleEntry );

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public SingleOutBoundaryStage(FlowProcess flowProcess, FlowElement flowElement)
}

@Override
public void receive(Duct prev, TupleEntry entry) {
public void receive(Duct prev, int ordinal, TupleEntry entry) {

if(this.nextTuple == null) {
this.nextTuple = entry.getTuple();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public boolean readNextRecord() throws Throwable {
continue;
}

next.receive(this, tupleEntry);
next.receive(this, 0, tupleEntry);
hasNext = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ClusterClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,7 +31,7 @@ public class AccumulatorCache {

private JobID jobID;

private Client client;
private ClusterClient client;

private volatile Map<String, Object> currentAccumulators = Collections.emptyMap();

Expand Down Expand Up @@ -80,7 +80,7 @@ public void setJobID(JobID jobID) {
this.jobID = jobID;
}

public void setClient(Client client) {
public void setClient(ClusterClient client) {
this.client = client;
}

Expand Down