Skip to content

Commit

Permalink
For #23 A start to pull things into a common place.
Browse files Browse the repository at this point in the history
  • Loading branch information
cjnolet committed Aug 16, 2014
1 parent 3b10c5e commit 2278c96
Show file tree
Hide file tree
Showing 14 changed files with 307 additions and 262 deletions.
45 changes: 20 additions & 25 deletions src/main/java/org/calrissian/flowmix/bolt/AggregatorBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
package org.calrissian.flowmix.bolt;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
Expand All @@ -23,22 +28,18 @@
import backtype.storm.tuple.Values;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.calrissian.flowmix.model.*;
import org.calrissian.flowmix.model.Flow;
import org.calrissian.flowmix.model.FlowInfo;
import org.calrissian.flowmix.model.Policy;
import org.calrissian.flowmix.model.StreamDef;
import org.calrissian.flowmix.model.event.AggregatedEvent;
import org.calrissian.flowmix.model.op.AggregateOp;
import org.calrissian.flowmix.model.op.FlowOp;
import org.calrissian.flowmix.model.op.PartitionOp;
import org.calrissian.flowmix.model.event.AggregatedEvent;
import org.calrissian.flowmix.support.Aggregator;
import org.calrissian.flowmix.support.window.AggregatorWindow;
import org.calrissian.mango.domain.event.Event;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.commons.lang.StringUtils.join;
import static org.calrissian.flowmix.Constants.*;
import static org.calrissian.flowmix.FlowmixFactory.declareOutputStreams;
import static org.calrissian.flowmix.FlowmixFactory.fields;
import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM;
Expand Down Expand Up @@ -109,24 +110,18 @@ public void execute(Tuple tuple) {

} else if(!"tick".equals(tuple.getSourceStreamId())){

String flowId = tuple.getStringByField(FLOW_ID);
Event event = (Event) tuple.getValueByField(EVENT);
int idx = tuple.getIntegerByField(FLOW_OP_IDX);
idx++;
String streamName = tuple.getStringByField(STREAM_NAME);
String previousStream = tuple.getStringByField(LAST_STREAM);
String partition = tuple.getStringByField(PARTITION);
FlowInfo flowInfo = new FlowInfo(tuple);

Flow flow = flows.get(flowId);
Flow flow = flows.get(flowInfo.getFlowId());

if(flow != null) {

AggregateOp op = (AggregateOp) flow.getStream(streamName).getFlowOps().get(idx);
Cache<String, AggregatorWindow> windowCache = windows.get(flowId + "\0" + streamName + "\0" + idx);
AggregateOp op = (AggregateOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx());
Cache<String, AggregatorWindow> windowCache = windows.get(flowInfo.getFlowId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx());

AggregatorWindow window = null;
if(windowCache != null) {
window = windowCache.getIfPresent(partition);
window = windowCache.getIfPresent(flowInfo.getPartition());
if(window != null) { // if we have a window already constructed, proces it

/**
Expand All @@ -135,15 +130,15 @@ public void execute(Tuple tuple) {
if(op.getEvictionPolicy() == Policy.TIME)
window.timeEvict(op.getEvictionThreshold());
} else {
window = buildWindow(op, streamName, idx, partition, flowId, windowCache);
window = buildWindow(op, flowInfo.getStreamName(), flowInfo.getIdx(), flowInfo.getPartition(), flowInfo.getFlowId(), windowCache);
}
} else {
windowCache = CacheBuilder.newBuilder().expireAfterWrite(op.getWindowEvictMillis(), TimeUnit.MILLISECONDS).build();
window = buildWindow(op, streamName, idx, partition, flowId, windowCache);
window = buildWindow(op, flowInfo.getStreamName(), flowInfo.getIdx(), flowInfo.getPartition(), flowInfo.getFlowId(), windowCache);
}

window.add(event, previousStream);
windowCache.put(partition, window); // window eviction is on writes, so we need to write to the window to reset our expiration.
window.add(flowInfo.getEvent(), flowInfo.getPreviousStream());
windowCache.put(flowInfo.getPartition(), window); // window eviction is on writes, so we need to write to the window to reset our expiration.

/**
* Perform count-based trigger if necessary
Expand All @@ -152,7 +147,7 @@ public void execute(Tuple tuple) {
window.incrTriggerTicks();

if(window.getTriggerTicks() == op.getTriggerThreshold())
emitAggregate(flow, op, streamName, idx, window);
emitAggregate(flow, op, flowInfo.getStreamName(), flowInfo.getIdx(), window);
}
}
}
Expand Down
42 changes: 20 additions & 22 deletions src/main/java/org/calrissian/flowmix/bolt/EachBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,24 @@
*/
package org.calrissian.flowmix.bolt;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.calrissian.flowmix.FlowmixFactory;
import org.calrissian.flowmix.model.op.EachOp;
import org.calrissian.flowmix.model.Flow;
import org.calrissian.flowmix.model.FlowInfo;
import org.calrissian.flowmix.model.op.EachOp;
import org.calrissian.flowmix.support.Utils;
import org.calrissian.mango.domain.event.Event;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.calrissian.flowmix.Constants.*;
import static org.calrissian.flowmix.FlowmixFactory.fields;
import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM;

Expand All @@ -56,35 +57,32 @@ public void execute(Tuple tuple) {
for(Flow flow : (Collection<Flow>)tuple.getValue(0))
flows.put(flow.getId(), flow);
} else if(!"tick".equals(tuple.getSourceStreamId())){
String flowId = tuple.getStringByField(FLOW_ID);
Event event = (Event) tuple.getValueByField(EVENT);
int idx = tuple.getIntegerByField(FLOW_OP_IDX);
idx++;
String streamName = tuple.getStringByField(STREAM_NAME);
String previousStream = tuple.getStringByField(LAST_STREAM);

Flow flow = flows.get(flowId);
FlowInfo flowInfo = new FlowInfo(tuple);

Flow flow = flows.get(flowInfo.getFlowId());

if(flow != null) {
EachOp functionOp = (EachOp) flow.getStream(streamName).getFlowOps().get(idx);
EachOp functionOp = (EachOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx());

String nextStream = Utils.getNextStreamFromFlowInfo(flowInfo, flow);

String nextStream = idx+1 < flow.getStream(streamName).getFlowOps().size() ? flow.getStream(streamName).getFlowOps().get(idx + 1).getComponentName() : "output";
List<Event> events = functionOp.getFunction().execute(event);
List<Event> events = functionOp.getFunction().execute(flowInfo.getEvent());

if((nextStream.equals("output") && flow.getStream(streamName).isStdOutput()) || !nextStream.equals("output")) {
if((nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).isStdOutput()) || !nextStream.equals("output")) {
if (events != null) {
for (Event newEvent : events)
collector.emit(nextStream, tuple, new Values(flowId, newEvent, idx, streamName, previousStream));
collector.emit(nextStream, tuple, new Values(flowInfo.getFlowId(), newEvent, flowInfo.getIdx(), flowInfo.getStreamName(), flowInfo.getPreviousStream()));
}
}

// send directly to any non std output streams that may be configured
if (nextStream.equals("output") && flow.getStream(streamName).getOutputs() != null) {
for (String output : flow.getStream(streamName).getOutputs()) {
if (nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).getOutputs() != null) {
for (String output : flow.getStream(flowInfo.getStreamName()).getOutputs()) {
if (events != null) {
for (Event newEvent : events) {
String outputStream = flow.getStream(output).getFlowOps().get(0).getComponentName();
collector.emit(outputStream, tuple, new Values(flowId, newEvent, -1, output, streamName));
collector.emit(outputStream, tuple, new Values(flowInfo.getFlowId(), newEvent, -1, output, flowInfo.getStreamName()));
}
}
}
Expand Down
42 changes: 13 additions & 29 deletions src/main/java/org/calrissian/flowmix/bolt/FilterBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,23 @@
*/
package org.calrissian.flowmix.bolt;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.calrissian.flowmix.model.op.FilterOp;
import org.calrissian.flowmix.model.Flow;
import org.calrissian.mango.domain.event.Event;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.calrissian.flowmix.model.FlowInfo;
import org.calrissian.flowmix.model.op.FilterOp;

import static org.calrissian.flowmix.Constants.*;
import static org.calrissian.flowmix.FlowmixFactory.declareOutputStreams;
import static org.calrissian.flowmix.FlowmixFactory.fields;
import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM;
import static org.calrissian.flowmix.support.Utils.emitNext;

public class FilterBolt extends BaseRichBolt {

Expand All @@ -52,32 +51,17 @@ public void execute(Tuple tuple) {
for(Flow flow : (Collection<Flow>)tuple.getValue(0))
flows.put(flow.getId(), flow);
} else if(!"tick".equals(tuple.getSourceStreamId())){
String flowId = tuple.getStringByField(FLOW_ID);
Event event = (Event) tuple.getValueByField(EVENT);
int idx = tuple.getIntegerByField(FLOW_OP_IDX);
idx++;
String streamName = tuple.getStringByField(STREAM_NAME);
String previousStream = tuple.getStringByField(LAST_STREAM);

Flow flow = flows.get(flowId);
FlowInfo flowInfo = new FlowInfo(tuple);

if(flow != null) {
FilterOp filterOp = (FilterOp) flow.getStream(streamName).getFlowOps().get(idx);

String nextStream = idx+1 < flow.getStream(streamName).getFlowOps().size() ? flow.getStream(streamName).getFlowOps().get(idx + 1).getComponentName() : "output";
Flow flow = flows.get(flowInfo.getFlowId());

if(filterOp.getFilter().accept(event)) {
if(flow != null) {

if((nextStream.equals("output") && flow.getStream(streamName).isStdOutput()) || !nextStream.equals("output"))
collector.emit(nextStream, tuple, new Values(flowId, event, idx, streamName, previousStream));
FilterOp filterOp = (FilterOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx());

// send directly to any non standard output streams that may be configured
if(nextStream.equals("output") && flow.getStream(streamName).getOutputs() != null) {
for (String output : flow.getStream(streamName).getOutputs()) {
String outputStream = flow.getStream(output).getFlowOps().get(0).getComponentName();
collector.emit(outputStream, tuple, new Values(flowId, event, -1, output, streamName));
}
}
if(filterOp.getFilter().accept(flowInfo.getEvent())) {
emitNext(tuple, flowInfo, flow, collector);
}
}
}
Expand Down
66 changes: 34 additions & 32 deletions src/main/java/org/calrissian/flowmix/bolt/JoinBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
package org.calrissian.flowmix.bolt;


import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
Expand All @@ -24,19 +31,19 @@
import backtype.storm.tuple.Values;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.calrissian.flowmix.model.*;
import org.calrissian.flowmix.model.Flow;
import org.calrissian.flowmix.model.FlowInfo;
import org.calrissian.flowmix.model.Policy;
import org.calrissian.flowmix.model.StreamDef;
import org.calrissian.flowmix.model.op.FlowOp;
import org.calrissian.flowmix.model.op.JoinOp;
import org.calrissian.flowmix.support.Utils;
import org.calrissian.flowmix.support.window.Window;
import org.calrissian.flowmix.support.window.WindowItem;
import org.calrissian.mango.domain.event.BaseEvent;
import org.calrissian.mango.domain.event.Event;

import java.util.*;
import java.util.concurrent.TimeUnit;

import static com.google.common.collect.Iterables.concat;
import static org.calrissian.flowmix.Constants.*;
import static org.calrissian.flowmix.FlowmixFactory.declareOutputStreams;
import static org.calrissian.flowmix.FlowmixFactory.fields;
import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM;
Expand Down Expand Up @@ -149,25 +156,19 @@ public void execute(Tuple tuple) {
*/
if (rulesMap.size() > 0) {

String ruleId = tuple.getStringByField(FLOW_ID);
String hash = tuple.contains(PARTITION) ? tuple.getStringByField(PARTITION) : "";
Event event = (Event) tuple.getValueByField(EVENT);
int idx = tuple.getIntegerByField(FLOW_OP_IDX);
idx++;
FlowInfo flowInfo = new FlowInfo(tuple);

String streamName = tuple.getStringByField(STREAM_NAME);
String previousStream = tuple.getStringByField(LAST_STREAM);
Flow flow = rulesMap.get(ruleId);
Flow flow = rulesMap.get(flowInfo.getFlowId());

JoinOp op = (JoinOp) flow.getStream(streamName).getFlowOps().get(idx);
JoinOp op = (JoinOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx());

// do processing on lhs
if(previousStream.equals(op.getLeftStream())) {
if(flowInfo.getPreviousStream().equals(op.getLeftStream())) {

Cache<String, Window> buffersForRule = windows.get(flow.getId() + "\0" + streamName + "\0" + idx);
Cache<String, Window> buffersForRule = windows.get(flow.getId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx());
Window buffer;
if (buffersForRule != null) {
buffer = buffersForRule.getIfPresent(hash);
buffer = buffersForRule.getIfPresent(flowInfo.getPartition());

if (buffer != null) { // if we have a buffer already, process it
/**
Expand All @@ -178,37 +179,38 @@ public void execute(Tuple tuple) {
}
} else {
buffersForRule = CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.MINUTES).build(); // just in case we get some rogue data, we don't wan ti to sit for too long.
buffer = op.getEvictionPolicy() == Policy.TIME ? new Window(hash) :
new Window(hash, op.getEvictionThreshold());
buffersForRule.put(hash, buffer);
windows.put(flow.getId() + "\0" + streamName + "\0" + idx, buffersForRule);
buffer = op.getEvictionPolicy() == Policy.TIME ? new Window(flowInfo.getPartition()) :
new Window(flowInfo.getPartition(), op.getEvictionThreshold());
buffersForRule.put(flowInfo.getPartition(), buffer);
windows.put(flow.getId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx(), buffersForRule);
}

buffer.add(event, previousStream);
buffer.add(flowInfo.getEvent(), flowInfo.getPreviousStream());

} else if(previousStream.equals(op.getRightStream())) {
} else if(flowInfo.getPreviousStream().equals(op.getRightStream())) {

Cache<String, Window> buffersForRule = windows.get(flow.getId() + "\0" + streamName + "\0" + idx);
Cache<String, Window> buffersForRule = windows.get(flow.getId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx());
Window buffer;
if (buffersForRule != null) {
buffer = buffersForRule.getIfPresent(hash);
buffer = buffersForRule.getIfPresent(flowInfo.getPartition());

for(WindowItem bufferedEvent : buffer.getEvents()) {
Event joined = new BaseEvent(bufferedEvent.getEvent().getId(), bufferedEvent.getEvent().getTimestamp());

// the hashcode will filter duplicates
joined.putAll(concat(bufferedEvent.getEvent().getTuples()));
joined.putAll(concat(event.getTuples()));
String nextStream = idx+1 < flow.getStream(streamName).getFlowOps().size() ? flow.getStream(streamName).getFlowOps().get(idx+1).getComponentName() : "output";
joined.putAll(concat(flowInfo.getEvent().getTuples()));
String nextStream = Utils.getNextStreamFromFlowInfo(flowInfo, flow);

if((nextStream.equals("output") && flow.getStream(streamName).isStdOutput()) || !nextStream.equals("output"))
collector.emit(nextStream, new Values(flow.getId(), joined, idx, streamName, bufferedEvent.getPreviousStream()));
if((nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).isStdOutput()) || !nextStream.equals("output"))
collector.emit(nextStream, new Values(flow.getId(), joined, flowInfo.getIdx(), flowInfo.getStreamName(), bufferedEvent.getPreviousStream()));

// send to any other streams that are configured (aside from output)
if(nextStream.equals("output")) {
if(flow.getStream(streamName).getOutputs() != null) {
for(String output : flow.getStream(streamName).getOutputs()) {
if(flow.getStream(flowInfo.getStreamName()).getOutputs() != null) {
for(String output : flow.getStream(flowInfo.getStreamName()).getOutputs()) {
String outputComponent = flow.getStream(output).getFlowOps().get(0).getComponentName();
collector.emit(outputComponent, new Values(flow.getId(), joined, -1, output, streamName));
collector.emit(outputComponent, new Values(flow.getId(), joined, -1, output, flowInfo.getStreamName()));
}
}
}
Expand Down
Loading

0 comments on commit 2278c96

Please sign in to comment.