diff --git a/src/main/java/org/calrissian/flowmix/bolt/AggregatorBolt.java b/src/main/java/org/calrissian/flowmix/bolt/AggregatorBolt.java index 324fa46..f739fd0 100644 --- a/src/main/java/org/calrissian/flowmix/bolt/AggregatorBolt.java +++ b/src/main/java/org/calrissian/flowmix/bolt/AggregatorBolt.java @@ -15,6 +15,11 @@ */ package org.calrissian.flowmix.bolt; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; @@ -23,22 +28,18 @@ import backtype.storm.tuple.Values; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import org.calrissian.flowmix.model.*; +import org.calrissian.flowmix.model.Flow; +import org.calrissian.flowmix.model.FlowInfo; +import org.calrissian.flowmix.model.Policy; +import org.calrissian.flowmix.model.StreamDef; +import org.calrissian.flowmix.model.event.AggregatedEvent; import org.calrissian.flowmix.model.op.AggregateOp; import org.calrissian.flowmix.model.op.FlowOp; import org.calrissian.flowmix.model.op.PartitionOp; -import org.calrissian.flowmix.model.event.AggregatedEvent; import org.calrissian.flowmix.support.Aggregator; import org.calrissian.flowmix.support.window.AggregatorWindow; -import org.calrissian.mango.domain.event.Event; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.apache.commons.lang.StringUtils.join; -import static org.calrissian.flowmix.Constants.*; import static org.calrissian.flowmix.FlowmixFactory.declareOutputStreams; import static org.calrissian.flowmix.FlowmixFactory.fields; import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM; @@ -109,24 +110,18 @@ public void execute(Tuple tuple) { } else if(!"tick".equals(tuple.getSourceStreamId())){ - String flowId = tuple.getStringByField(FLOW_ID); - Event event = (Event) tuple.getValueByField(EVENT); - int idx = tuple.getIntegerByField(FLOW_OP_IDX); - idx++; - String streamName = tuple.getStringByField(STREAM_NAME); - String previousStream = tuple.getStringByField(LAST_STREAM); - String partition = tuple.getStringByField(PARTITION); + FlowInfo flowInfo = new FlowInfo(tuple); - Flow flow = flows.get(flowId); + Flow flow = flows.get(flowInfo.getFlowId()); if(flow != null) { - AggregateOp op = (AggregateOp) flow.getStream(streamName).getFlowOps().get(idx); - Cache windowCache = windows.get(flowId + "\0" + streamName + "\0" + idx); + AggregateOp op = (AggregateOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx()); + Cache windowCache = windows.get(flowInfo.getFlowId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx()); AggregatorWindow window = null; if(windowCache != null) { - window = windowCache.getIfPresent(partition); + window = windowCache.getIfPresent(flowInfo.getPartition()); if(window != null) { // if we have a window already constructed, proces it /** @@ -135,15 +130,15 @@ public void execute(Tuple tuple) { if(op.getEvictionPolicy() == Policy.TIME) window.timeEvict(op.getEvictionThreshold()); } else { - window = buildWindow(op, streamName, idx, partition, flowId, windowCache); + window = buildWindow(op, flowInfo.getStreamName(), flowInfo.getIdx(), flowInfo.getPartition(), flowInfo.getFlowId(), windowCache); } } else { windowCache = CacheBuilder.newBuilder().expireAfterWrite(op.getWindowEvictMillis(), TimeUnit.MILLISECONDS).build(); - window = buildWindow(op, streamName, idx, partition, flowId, windowCache); + window = buildWindow(op, flowInfo.getStreamName(), flowInfo.getIdx(), flowInfo.getPartition(), flowInfo.getFlowId(), windowCache); } - window.add(event, previousStream); - windowCache.put(partition, window); // window eviction is on writes, so we need to write to the window to reset our expiration. + window.add(flowInfo.getEvent(), flowInfo.getPreviousStream()); + windowCache.put(flowInfo.getPartition(), window); // window eviction is on writes, so we need to write to the window to reset our expiration. /** * Perform count-based trigger if necessary @@ -152,7 +147,7 @@ public void execute(Tuple tuple) { window.incrTriggerTicks(); if(window.getTriggerTicks() == op.getTriggerThreshold()) - emitAggregate(flow, op, streamName, idx, window); + emitAggregate(flow, op, flowInfo.getStreamName(), flowInfo.getIdx(), window); } } } diff --git a/src/main/java/org/calrissian/flowmix/bolt/EachBolt.java b/src/main/java/org/calrissian/flowmix/bolt/EachBolt.java index 62c0ea4..cb73760 100644 --- a/src/main/java/org/calrissian/flowmix/bolt/EachBolt.java +++ b/src/main/java/org/calrissian/flowmix/bolt/EachBolt.java @@ -15,6 +15,11 @@ */ package org.calrissian.flowmix.bolt; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; @@ -22,16 +27,12 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.calrissian.flowmix.FlowmixFactory; -import org.calrissian.flowmix.model.op.EachOp; import org.calrissian.flowmix.model.Flow; +import org.calrissian.flowmix.model.FlowInfo; +import org.calrissian.flowmix.model.op.EachOp; +import org.calrissian.flowmix.support.Utils; import org.calrissian.mango.domain.event.Event; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.calrissian.flowmix.Constants.*; import static org.calrissian.flowmix.FlowmixFactory.fields; import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM; @@ -56,35 +57,32 @@ public void execute(Tuple tuple) { for(Flow flow : (Collection)tuple.getValue(0)) flows.put(flow.getId(), flow); } else if(!"tick".equals(tuple.getSourceStreamId())){ - String flowId = tuple.getStringByField(FLOW_ID); - Event event = (Event) tuple.getValueByField(EVENT); - int idx = tuple.getIntegerByField(FLOW_OP_IDX); - idx++; - String streamName = tuple.getStringByField(STREAM_NAME); - String previousStream = tuple.getStringByField(LAST_STREAM); - Flow flow = flows.get(flowId); + FlowInfo flowInfo = new FlowInfo(tuple); + + Flow flow = flows.get(flowInfo.getFlowId()); if(flow != null) { - EachOp functionOp = (EachOp) flow.getStream(streamName).getFlowOps().get(idx); + EachOp functionOp = (EachOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx()); + + String nextStream = Utils.getNextStreamFromFlowInfo(flowInfo, flow); - String nextStream = idx+1 < flow.getStream(streamName).getFlowOps().size() ? flow.getStream(streamName).getFlowOps().get(idx + 1).getComponentName() : "output"; - List events = functionOp.getFunction().execute(event); + List events = functionOp.getFunction().execute(flowInfo.getEvent()); - if((nextStream.equals("output") && flow.getStream(streamName).isStdOutput()) || !nextStream.equals("output")) { + if((nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).isStdOutput()) || !nextStream.equals("output")) { if (events != null) { for (Event newEvent : events) - collector.emit(nextStream, tuple, new Values(flowId, newEvent, idx, streamName, previousStream)); + collector.emit(nextStream, tuple, new Values(flowInfo.getFlowId(), newEvent, flowInfo.getIdx(), flowInfo.getStreamName(), flowInfo.getPreviousStream())); } } // send directly to any non std output streams that may be configured - if (nextStream.equals("output") && flow.getStream(streamName).getOutputs() != null) { - for (String output : flow.getStream(streamName).getOutputs()) { + if (nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).getOutputs() != null) { + for (String output : flow.getStream(flowInfo.getStreamName()).getOutputs()) { if (events != null) { for (Event newEvent : events) { String outputStream = flow.getStream(output).getFlowOps().get(0).getComponentName(); - collector.emit(outputStream, tuple, new Values(flowId, newEvent, -1, output, streamName)); + collector.emit(outputStream, tuple, new Values(flowInfo.getFlowId(), newEvent, -1, output, flowInfo.getStreamName())); } } } diff --git a/src/main/java/org/calrissian/flowmix/bolt/FilterBolt.java b/src/main/java/org/calrissian/flowmix/bolt/FilterBolt.java index b85fbdb..bc9a0a7 100644 --- a/src/main/java/org/calrissian/flowmix/bolt/FilterBolt.java +++ b/src/main/java/org/calrissian/flowmix/bolt/FilterBolt.java @@ -15,24 +15,23 @@ */ package org.calrissian.flowmix.bolt; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import org.calrissian.flowmix.model.op.FilterOp; import org.calrissian.flowmix.model.Flow; -import org.calrissian.mango.domain.event.Event; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import org.calrissian.flowmix.model.FlowInfo; +import org.calrissian.flowmix.model.op.FilterOp; -import static org.calrissian.flowmix.Constants.*; import static org.calrissian.flowmix.FlowmixFactory.declareOutputStreams; import static org.calrissian.flowmix.FlowmixFactory.fields; import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM; +import static org.calrissian.flowmix.support.Utils.emitNext; public class FilterBolt extends BaseRichBolt { @@ -52,32 +51,17 @@ public void execute(Tuple tuple) { for(Flow flow : (Collection)tuple.getValue(0)) flows.put(flow.getId(), flow); } else if(!"tick".equals(tuple.getSourceStreamId())){ - String flowId = tuple.getStringByField(FLOW_ID); - Event event = (Event) tuple.getValueByField(EVENT); - int idx = tuple.getIntegerByField(FLOW_OP_IDX); - idx++; - String streamName = tuple.getStringByField(STREAM_NAME); - String previousStream = tuple.getStringByField(LAST_STREAM); - Flow flow = flows.get(flowId); + FlowInfo flowInfo = new FlowInfo(tuple); - if(flow != null) { - FilterOp filterOp = (FilterOp) flow.getStream(streamName).getFlowOps().get(idx); - - String nextStream = idx+1 < flow.getStream(streamName).getFlowOps().size() ? flow.getStream(streamName).getFlowOps().get(idx + 1).getComponentName() : "output"; + Flow flow = flows.get(flowInfo.getFlowId()); - if(filterOp.getFilter().accept(event)) { + if(flow != null) { - if((nextStream.equals("output") && flow.getStream(streamName).isStdOutput()) || !nextStream.equals("output")) - collector.emit(nextStream, tuple, new Values(flowId, event, idx, streamName, previousStream)); + FilterOp filterOp = (FilterOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx()); - // send directly to any non standard output streams that may be configured - if(nextStream.equals("output") && flow.getStream(streamName).getOutputs() != null) { - for (String output : flow.getStream(streamName).getOutputs()) { - String outputStream = flow.getStream(output).getFlowOps().get(0).getComponentName(); - collector.emit(outputStream, tuple, new Values(flowId, event, -1, output, streamName)); - } - } + if(filterOp.getFilter().accept(flowInfo.getEvent())) { + emitNext(tuple, flowInfo, flow, collector); } } } diff --git a/src/main/java/org/calrissian/flowmix/bolt/JoinBolt.java b/src/main/java/org/calrissian/flowmix/bolt/JoinBolt.java index 64e3924..b968a81 100644 --- a/src/main/java/org/calrissian/flowmix/bolt/JoinBolt.java +++ b/src/main/java/org/calrissian/flowmix/bolt/JoinBolt.java @@ -16,6 +16,13 @@ package org.calrissian.flowmix.bolt; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; @@ -24,19 +31,19 @@ import backtype.storm.tuple.Values; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import org.calrissian.flowmix.model.*; +import org.calrissian.flowmix.model.Flow; +import org.calrissian.flowmix.model.FlowInfo; +import org.calrissian.flowmix.model.Policy; +import org.calrissian.flowmix.model.StreamDef; import org.calrissian.flowmix.model.op.FlowOp; import org.calrissian.flowmix.model.op.JoinOp; +import org.calrissian.flowmix.support.Utils; import org.calrissian.flowmix.support.window.Window; import org.calrissian.flowmix.support.window.WindowItem; import org.calrissian.mango.domain.event.BaseEvent; import org.calrissian.mango.domain.event.Event; -import java.util.*; -import java.util.concurrent.TimeUnit; - import static com.google.common.collect.Iterables.concat; -import static org.calrissian.flowmix.Constants.*; import static org.calrissian.flowmix.FlowmixFactory.declareOutputStreams; import static org.calrissian.flowmix.FlowmixFactory.fields; import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM; @@ -149,25 +156,19 @@ public void execute(Tuple tuple) { */ if (rulesMap.size() > 0) { - String ruleId = tuple.getStringByField(FLOW_ID); - String hash = tuple.contains(PARTITION) ? tuple.getStringByField(PARTITION) : ""; - Event event = (Event) tuple.getValueByField(EVENT); - int idx = tuple.getIntegerByField(FLOW_OP_IDX); - idx++; + FlowInfo flowInfo = new FlowInfo(tuple); - String streamName = tuple.getStringByField(STREAM_NAME); - String previousStream = tuple.getStringByField(LAST_STREAM); - Flow flow = rulesMap.get(ruleId); + Flow flow = rulesMap.get(flowInfo.getFlowId()); - JoinOp op = (JoinOp) flow.getStream(streamName).getFlowOps().get(idx); + JoinOp op = (JoinOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx()); // do processing on lhs - if(previousStream.equals(op.getLeftStream())) { + if(flowInfo.getPreviousStream().equals(op.getLeftStream())) { - Cache buffersForRule = windows.get(flow.getId() + "\0" + streamName + "\0" + idx); + Cache buffersForRule = windows.get(flow.getId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx()); Window buffer; if (buffersForRule != null) { - buffer = buffersForRule.getIfPresent(hash); + buffer = buffersForRule.getIfPresent(flowInfo.getPartition()); if (buffer != null) { // if we have a buffer already, process it /** @@ -178,37 +179,38 @@ public void execute(Tuple tuple) { } } else { buffersForRule = CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.MINUTES).build(); // just in case we get some rogue data, we don't wan ti to sit for too long. - buffer = op.getEvictionPolicy() == Policy.TIME ? new Window(hash) : - new Window(hash, op.getEvictionThreshold()); - buffersForRule.put(hash, buffer); - windows.put(flow.getId() + "\0" + streamName + "\0" + idx, buffersForRule); + buffer = op.getEvictionPolicy() == Policy.TIME ? new Window(flowInfo.getPartition()) : + new Window(flowInfo.getPartition(), op.getEvictionThreshold()); + buffersForRule.put(flowInfo.getPartition(), buffer); + windows.put(flow.getId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx(), buffersForRule); } - buffer.add(event, previousStream); + buffer.add(flowInfo.getEvent(), flowInfo.getPreviousStream()); - } else if(previousStream.equals(op.getRightStream())) { + } else if(flowInfo.getPreviousStream().equals(op.getRightStream())) { - Cache buffersForRule = windows.get(flow.getId() + "\0" + streamName + "\0" + idx); + Cache buffersForRule = windows.get(flow.getId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx()); Window buffer; if (buffersForRule != null) { - buffer = buffersForRule.getIfPresent(hash); + buffer = buffersForRule.getIfPresent(flowInfo.getPartition()); for(WindowItem bufferedEvent : buffer.getEvents()) { Event joined = new BaseEvent(bufferedEvent.getEvent().getId(), bufferedEvent.getEvent().getTimestamp()); + // the hashcode will filter duplicates joined.putAll(concat(bufferedEvent.getEvent().getTuples())); - joined.putAll(concat(event.getTuples())); - String nextStream = idx+1 < flow.getStream(streamName).getFlowOps().size() ? flow.getStream(streamName).getFlowOps().get(idx+1).getComponentName() : "output"; + joined.putAll(concat(flowInfo.getEvent().getTuples())); + String nextStream = Utils.getNextStreamFromFlowInfo(flowInfo, flow); - if((nextStream.equals("output") && flow.getStream(streamName).isStdOutput()) || !nextStream.equals("output")) - collector.emit(nextStream, new Values(flow.getId(), joined, idx, streamName, bufferedEvent.getPreviousStream())); + if((nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).isStdOutput()) || !nextStream.equals("output")) + collector.emit(nextStream, new Values(flow.getId(), joined, flowInfo.getIdx(), flowInfo.getStreamName(), bufferedEvent.getPreviousStream())); // send to any other streams that are configured (aside from output) if(nextStream.equals("output")) { - if(flow.getStream(streamName).getOutputs() != null) { - for(String output : flow.getStream(streamName).getOutputs()) { + if(flow.getStream(flowInfo.getStreamName()).getOutputs() != null) { + for(String output : flow.getStream(flowInfo.getStreamName()).getOutputs()) { String outputComponent = flow.getStream(output).getFlowOps().get(0).getComponentName(); - collector.emit(outputComponent, new Values(flow.getId(), joined, -1, output, streamName)); + collector.emit(outputComponent, new Values(flow.getId(), joined, -1, output, flowInfo.getStreamName())); } } } diff --git a/src/main/java/org/calrissian/flowmix/bolt/PartitionBolt.java b/src/main/java/org/calrissian/flowmix/bolt/PartitionBolt.java index 3bd4b3d..36b4199 100644 --- a/src/main/java/org/calrissian/flowmix/bolt/PartitionBolt.java +++ b/src/main/java/org/calrissian/flowmix/bolt/PartitionBolt.java @@ -15,6 +15,10 @@ */ package org.calrissian.flowmix.bolt; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; @@ -22,14 +26,10 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.calrissian.flowmix.model.Flow; +import org.calrissian.flowmix.model.FlowInfo; import org.calrissian.flowmix.model.op.PartitionOp; -import org.calrissian.mango.domain.event.Event; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import org.calrissian.flowmix.support.Utils; -import static org.calrissian.flowmix.Constants.*; import static org.calrissian.flowmix.FlowmixFactory.declareOutputStreams; import static org.calrissian.flowmix.FlowmixFactory.partitionFields; import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM; @@ -54,30 +54,25 @@ public void execute(Tuple tuple) { flows.put(flow.getId(), flow); } else if(!"tick".equals(tuple.getSourceStreamId())) { - String flowId = tuple.getStringByField(FLOW_ID); - Event event = (Event) tuple.getValueByField(EVENT); - int idx = tuple.getIntegerByField(FLOW_OP_IDX); - String streamName = tuple.getStringByField(STREAM_NAME); - String previousStream = tuple.getStringByField(LAST_STREAM); - idx++; + FlowInfo flowInfo = new FlowInfo(tuple); - Flow flow = flows.get(flowId); + Flow flow = flows.get(flowInfo.getFlowId()); if(flow != null) { - PartitionOp partitionOp = (PartitionOp) flow.getStream(streamName).getFlowOps().get(idx); + PartitionOp partitionOp = (PartitionOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx()); - String nextStream = idx+1 < flow.getStream(streamName).getFlowOps().size() ? flow.getStream(streamName).getFlowOps().get(idx + 1).getComponentName() : "output"; - String hash = buildKeyIndexForEvent(flowId, event, partitionOp.getFields()); + String nextStream = Utils.getNextStreamFromFlowInfo(flowInfo, flow); + String hash = buildKeyIndexForEvent(flowInfo.getFlowId(), flowInfo.getEvent(), partitionOp.getFields()); - if((nextStream.equals("output") && flow.getStream(streamName).isStdOutput()) || !nextStream.equals("output")) - collector.emit(nextStream, tuple, new Values(flowId, event, idx, streamName, hash, previousStream)); + if((nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).isStdOutput()) || !nextStream.equals("output")) + collector.emit(nextStream, tuple, new Values(flowInfo.getFlowId(), flowInfo.getEvent(), flowInfo.getIdx(), flowInfo.getStreamName(), hash, flowInfo.getPreviousStream())); // send directly to any other non std output streams that may be configured - if(nextStream.equals("output") && flow.getStream(streamName).getOutputs() != null) { - for (String output : flow.getStream(streamName).getOutputs()) { + if(nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).getOutputs() != null) { + for (String output : flow.getStream(flowInfo.getStreamName()).getOutputs()) { String outputStream = flow.getStream(output).getFlowOps().get(0).getComponentName(); - collector.emit(outputStream, tuple, new Values(flowId, event, -1, output, streamName)); + collector.emit(outputStream, tuple, new Values(flowInfo.getFlowId(), flowInfo.getEvent(), -1, output, hash, flowInfo.getStreamName())); } } } diff --git a/src/main/java/org/calrissian/flowmix/bolt/SelectorBolt.java b/src/main/java/org/calrissian/flowmix/bolt/SelectorBolt.java index 05d06e8..0edd6c5 100644 --- a/src/main/java/org/calrissian/flowmix/bolt/SelectorBolt.java +++ b/src/main/java/org/calrissian/flowmix/bolt/SelectorBolt.java @@ -15,6 +15,10 @@ */ package org.calrissian.flowmix.bolt; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; @@ -23,13 +27,12 @@ import backtype.storm.tuple.Values; import org.calrissian.flowmix.FlowmixFactory; import org.calrissian.flowmix.model.Flow; +import org.calrissian.flowmix.model.FlowInfo; import org.calrissian.flowmix.model.op.SelectOp; +import org.calrissian.flowmix.support.Utils; import org.calrissian.mango.domain.event.BaseEvent; import org.calrissian.mango.domain.event.Event; -import java.util.*; - -import static org.calrissian.flowmix.Constants.*; import static org.calrissian.flowmix.FlowmixFactory.fields; import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM; @@ -52,23 +55,18 @@ public void execute(Tuple tuple) { for(Flow flow : (Collection)tuple.getValue(0)) flows.put(flow.getId(), flow); } else if(!"tick".equals(tuple.getSourceStreamId())) { - String flowId = tuple.getStringByField(FLOW_ID); - Event event = (Event) tuple.getValueByField(EVENT); - int idx = tuple.getIntegerByField(FLOW_OP_IDX); - String streamName = tuple.getStringByField(STREAM_NAME); - String previousStream = tuple.getStringByField(LAST_STREAM); - idx++; + FlowInfo flowInfo = new FlowInfo(tuple); - Flow flow = flows.get(flowId); + Flow flow = flows.get(flowInfo.getFlowId()); if (flow != null) { - SelectOp selectOp = (SelectOp) flow.getStream(streamName).getFlowOps().get(idx); + SelectOp selectOp = (SelectOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx()); - String nextStream = idx+1 < flow.getStream(streamName).getFlowOps().size() ? flow.getStream(streamName).getFlowOps().get(idx + 1).getComponentName() : "output"; + String nextStream = Utils.getNextStreamFromFlowInfo(flowInfo, flow); - Event newEvent = new BaseEvent(event.getId(), event.getTimestamp()); - for(org.calrissian.mango.domain.Tuple eventTuple : event.getTuples()) { + Event newEvent = new BaseEvent(flowInfo.getEvent().getId(), flowInfo.getEvent().getTimestamp()); + for(org.calrissian.mango.domain.Tuple eventTuple : flowInfo.getEvent().getTuples()) { if(selectOp.getFields().contains(eventTuple.getKey())) newEvent.put(eventTuple); } @@ -76,16 +74,16 @@ public void execute(Tuple tuple) { /** * If no selected tuples existed, event will not be emitted */ - if((nextStream.equals("output") && flow.getStream(streamName).isStdOutput()) || !nextStream.equals("output")) { + if((nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).isStdOutput()) || !nextStream.equals("output")) { if (newEvent.getTuples().size() > 0) - collector.emit(nextStream, tuple, new Values(flowId, newEvent, idx, streamName, previousStream)); + collector.emit(nextStream, tuple, new Values(flowInfo.getFlowId(), newEvent, flowInfo.getIdx(), flowInfo.getStreamName(), flowInfo.getPreviousStream())); } // send directly to any non std output streams - if(nextStream.equals("output") && flow.getStream(streamName).getOutputs() != null) { - for (String output : flow.getStream(streamName).getOutputs()) { + if(nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).getOutputs() != null) { + for (String output : flow.getStream(flowInfo.getStreamName()).getOutputs()) { String outputStream = flow.getStream(output).getFlowOps().get(0).getComponentName(); - collector.emit(outputStream, tuple, new Values(flowId, event, -1, output, streamName)); + collector.emit(outputStream, tuple, new Values(flowInfo.getFlowId(), flowInfo.getEvent(), -1, output, flowInfo.getStreamName())); } } diff --git a/src/main/java/org/calrissian/flowmix/bolt/SortBolt.java b/src/main/java/org/calrissian/flowmix/bolt/SortBolt.java index bd63bdf..d6a3ee9 100644 --- a/src/main/java/org/calrissian/flowmix/bolt/SortBolt.java +++ b/src/main/java/org/calrissian/flowmix/bolt/SortBolt.java @@ -15,6 +15,13 @@ */ package org.calrissian.flowmix.bolt; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; @@ -24,20 +31,19 @@ import backtype.storm.tuple.Values; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import org.calrissian.flowmix.model.*; +import org.calrissian.flowmix.model.Flow; +import org.calrissian.flowmix.model.FlowInfo; +import org.calrissian.flowmix.model.Policy; +import org.calrissian.flowmix.model.StreamDef; import org.calrissian.flowmix.model.op.FlowOp; import org.calrissian.flowmix.model.op.SortOp; -import org.calrissian.flowmix.support.*; +import org.calrissian.flowmix.support.EventSortByComparator; +import org.calrissian.flowmix.support.Utils; import org.calrissian.flowmix.support.window.SortedWindow; import org.calrissian.flowmix.support.window.Window; import org.calrissian.flowmix.support.window.WindowItem; -import org.calrissian.mango.domain.event.Event; - -import java.util.*; -import java.util.concurrent.TimeUnit; import static java.util.Collections.singleton; -import static org.calrissian.flowmix.Constants.*; import static org.calrissian.flowmix.FlowmixFactory.declareOutputStreams; import static org.calrissian.flowmix.FlowmixFactory.fields; import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM; @@ -132,8 +138,10 @@ public void execute(Tuple tuple) { if(op.getTriggerPolicy() == Policy.TIME) window.incrTriggerTicks(); - if(window.getTriggerTicks() == op.getTriggerThreshold()) - emitWindow(flow, curStream.getName(), op, window, idx); + if(window.getTriggerTicks() == op.getTriggerThreshold()) { + FlowInfo flowInfo = new FlowInfo(flow.getId(), curStream.getName(), idx); + emitWindow(tuple, flowInfo, flow, op, window); + } } } @@ -160,22 +168,16 @@ public void execute(Tuple tuple) { * The hashKey was added to the "fieldsGrouping" in an attempt to share pointers where possible. Different * rules with like fields groupings can store the items in their windows on the same node. */ - String flowId = tuple.getStringByField(FLOW_ID); - String hash = tuple.contains(PARTITION) ? tuple.getStringByField(PARTITION) : ""; - Event event = (Event) tuple.getValueByField(EVENT); - int idx = tuple.getIntegerByField(FLOW_OP_IDX); - String streamName = tuple.getStringByField(STREAM_NAME); - String previousStream = tuple.getStringByField(LAST_STREAM); - idx++; + FlowInfo flowInfo = new FlowInfo(tuple); - Flow flow = flowMap.get(flowId); + Flow flow = flowMap.get(flowInfo.getFlowId()); - SortOp op = (SortOp) flow.getStream(streamName).getFlowOps().get(idx); + SortOp op = (SortOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx()); - Cache buffersForRule = windows.get(flow.getId() + "\0" + streamName + "\0" + idx); + Cache buffersForRule = windows.get(flow.getId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx()); SortedWindow buffer; if (buffersForRule != null) { - buffer = buffersForRule.getIfPresent(hash); + buffer = buffersForRule.getIfPresent(flowInfo.getPartition()); if (buffer != null) { // if we have a buffer already, process it if(op.getEvictionPolicy() == Policy.TIME) @@ -183,21 +185,21 @@ public void execute(Tuple tuple) { } else { buffersForRule = CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.MINUTES).build(); // just in case we get some rogue data, we don't wan ti to sit for too long. - buffer = buildWindow(hash, op); - buffersForRule.put(hash, buffer); - windows.put(flow.getId() + "\0" + streamName + "\0" + idx, buffersForRule); + buffer = buildWindow(flowInfo.getPartition(), op); + buffersForRule.put(flowInfo.getPartition(), buffer); + windows.put(flow.getId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx(), buffersForRule); } } else { buffersForRule = CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.MINUTES).build(); // just in case we get some rogue data, we don't wan ti to sit for too long. - buffer = buildWindow(hash, op); - buffersForRule.put(hash, buffer); - windows.put(flow.getId() + "\0" + streamName + "\0" + idx, buffersForRule); + buffer = buildWindow(flowInfo.getPartition(), op); + buffersForRule.put(flowInfo.getPartition(), buffer); + windows.put(flow.getId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx(), buffersForRule); } if(op.getEvictionPolicy() == Policy.COUNT && op.getEvictionThreshold() == buffer.size()) buffer.expire(); - buffer.add(event, previousStream); + buffer.add(flowInfo.getEvent(), flowInfo.getPreviousStream()); /** * Perform count-based trigger if necessary @@ -206,9 +208,9 @@ public void execute(Tuple tuple) { buffer.incrTriggerTicks(); if(buffer.getTriggerTicks() == op.getTriggerThreshold()) - emitWindow(flow, streamName, op, buffer, idx); + emitWindow(tuple, flowInfo, flow, op, buffer); } else if(op.getTriggerPolicy() == Policy.TIME_DELTA_LT && buffer.timeRange() > -1 && buffer.timeRange() <= op.getTriggerThreshold() * 1000) - emitWindow(flow, streamName, op, buffer, idx); + emitWindow(tuple, flowInfo, flow, op, buffer); // /** // * If we aren't supposed to clear the window right now, then we need to emit @@ -225,8 +227,7 @@ public void execute(Tuple tuple) { collector.ack(tuple); } - private void emitWindow(Flow flow, String streamName, SortOp op, Window window, int idx) { - String nextStream = idx+1 < flow.getStream(streamName).getFlowOps().size() ? flow.getStream(streamName).getFlowOps().get(idx + 1).getComponentName() : "output"; + private void emitWindow(Tuple tuple, FlowInfo flowInfo, Flow flow, SortOp op, Window window) { /** * If the window is set to be cleared, we need to emit everything. Otherwise, just emit the last item in the list. @@ -243,14 +244,16 @@ private void emitWindow(Flow flow, String streamName, SortOp op, Window window, if(items != null) { for(WindowItem item : items) { - if((nextStream.equals("output") && flow.getStream(streamName).isStdOutput()) || !nextStream.equals("output")) - collector.emit(nextStream, new Values(flow.getId(), item.getEvent(), idx, streamName, item.getPreviousStream())); + + String nextStream = Utils.getNextStreamFromFlowInfo(flowInfo, flow); + if((nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).isStdOutput()) || !nextStream.equals("output")) + collector.emit(nextStream, new Values(flow.getId(), item.getEvent(), flowInfo.getIdx(), flowInfo.getStreamName(), item.getPreviousStream())); // send directly to any non std output streams - if(nextStream.equals("output") && flow.getStream(streamName).getOutputs() != null) { - for (String output : flow.getStream(streamName).getOutputs()) { + if(nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).getOutputs() != null) { + for (String output : flow.getStream(flowInfo.getStreamName()).getOutputs()) { String outputStream = flow.getStream(output).getFlowOps().get(0).getComponentName(); - collector.emit(outputStream, new Values(flow.getId(), item.getEvent(), -1, output, streamName)); + collector.emit(outputStream, new Values(flow.getId(), item.getEvent(), -1, output, flowInfo.getStreamName())); } } } diff --git a/src/main/java/org/calrissian/flowmix/bolt/SplitBolt.java b/src/main/java/org/calrissian/flowmix/bolt/SplitBolt.java index 9b77a74..cd795a7 100644 --- a/src/main/java/org/calrissian/flowmix/bolt/SplitBolt.java +++ b/src/main/java/org/calrissian/flowmix/bolt/SplitBolt.java @@ -15,6 +15,10 @@ */ package org.calrissian.flowmix.bolt; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; @@ -23,16 +27,12 @@ import backtype.storm.tuple.Values; import org.calrissian.flowmix.FlowmixFactory; import org.calrissian.flowmix.model.Flow; +import org.calrissian.flowmix.model.FlowInfo; import org.calrissian.flowmix.model.op.SplitOp; import org.calrissian.flowmix.support.Filter; +import org.calrissian.flowmix.support.Utils; import org.calrissian.mango.domain.Pair; -import org.calrissian.mango.domain.event.Event; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import static org.calrissian.flowmix.Constants.*; import static org.calrissian.flowmix.FlowmixFactory.fields; import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM; @@ -55,33 +55,27 @@ public void execute(Tuple tuple) { flows.put(flow.getId(), flow); } else if (!"tick".equals(tuple.getSourceStreamId())) { - String flowId = tuple.getStringByField(FLOW_ID); - Event event = (Event) tuple.getValueByField(EVENT); - int idx = tuple.getIntegerByField(FLOW_OP_IDX); - String streamName = tuple.getStringByField(STREAM_NAME); - String previousStream = tuple.getStringByField(LAST_STREAM); - idx++; + FlowInfo flowInfo = new FlowInfo(tuple); - Flow flow = flows.get(flowId); + Flow flow = flows.get(flowInfo.getFlowId()); if (flow != null) { - SplitOp splitOp = (SplitOp) flow.getStream(streamName).getFlowOps().get(idx); - - String nextStream = - idx + 1 < flow.getStream(streamName).getFlowOps().size() ? flow.getStream(streamName).getFlowOps().get(idx + 1).getComponentName() : "output"; + SplitOp splitOp = (SplitOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx()); + String nextStream = Utils.getNextStreamFromFlowInfo(flowInfo, flow); // first check the default path Filter filter = splitOp.getDefaultPath(); - if(filter != null && filter.accept(event)) { - if ((nextStream.equals("output") && flow.getStream(streamName).isStdOutput()) || !nextStream.equals("output")) - collector.emit(nextStream, tuple, new Values(flowId, event, idx, streamName, previousStream)); + + if(filter != null && filter.accept(flowInfo.getEvent())) { + if ((nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).isStdOutput()) || !nextStream.equals("output")) + collector.emit(nextStream, tuple, new Values(flowInfo.getFlowId(), flowInfo.getEvent(), flowInfo.getIdx(), flowInfo.getStreamName(), flowInfo.getPreviousStream())); // send directly to any non std output streams - if (nextStream.equals("output") && flow.getStream(streamName).getOutputs() != null) { - for (String output : flow.getStream(streamName).getOutputs()) { + if (nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).getOutputs() != null) { + for (String output : flow.getStream(flowInfo.getStreamName()).getOutputs()) { String outputStream = flow.getStream(output).getFlowOps().get(0).getComponentName(); - collector.emit(outputStream, tuple, new Values(flowId, event, -1, output, streamName)); + collector.emit(outputStream, tuple, new Values(flowInfo.getFlowId(), flowInfo.getEvent(), -1, output, flowInfo.getStreamName())); } } } @@ -89,9 +83,9 @@ public void execute(Tuple tuple) { // then check all other paths if(splitOp.getPaths() != null) { for(Pair pathPair : splitOp.getPaths()) { - if(pathPair.getOne().accept(event)) { + if(pathPair.getOne().accept(flowInfo.getEvent())) { String outputStream = flow.getStream(pathPair.getTwo()).getFlowOps().get(0).getComponentName(); - collector.emit(outputStream, tuple, new Values(flowId, event, -1, pathPair.getTwo(), streamName)); + collector.emit(outputStream, tuple, new Values(flowInfo.getFlowId(), flowInfo.getEvent(), -1, pathPair.getTwo(), flowInfo.getStreamName())); } } } diff --git a/src/main/java/org/calrissian/flowmix/bolt/SwitchBolt.java b/src/main/java/org/calrissian/flowmix/bolt/SwitchBolt.java index e33fb77..0fdf2a3 100644 --- a/src/main/java/org/calrissian/flowmix/bolt/SwitchBolt.java +++ b/src/main/java/org/calrissian/flowmix/bolt/SwitchBolt.java @@ -16,26 +16,29 @@ package org.calrissian.flowmix.bolt; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.calrissian.flowmix.model.Flow; +import org.calrissian.flowmix.model.FlowInfo; import org.calrissian.flowmix.model.Policy; import org.calrissian.flowmix.model.StreamDef; import org.calrissian.flowmix.model.op.FlowOp; import org.calrissian.flowmix.model.op.SwitchOp; +import org.calrissian.flowmix.support.Utils; import org.calrissian.flowmix.support.window.SwitchWindow; -import org.calrissian.mango.domain.event.Event; -import java.util.*; -import java.util.concurrent.TimeUnit; - -import static org.calrissian.flowmix.Constants.*; import static org.calrissian.flowmix.FlowmixFactory.declareOutputStreams; import static org.calrissian.flowmix.FlowmixFactory.fields; import static org.calrissian.flowmix.spout.MockFlowLoaderSpout.FLOW_LOADER_STREAM; @@ -130,7 +133,6 @@ public void execute(Tuple tuple) { boolean justOpened = false; if(op.getClosePolicy() == Policy.TIME && buffer.isStopped()) { if(buffer.getStopTicks() == op.getCloseThreshold()) { - System.out.println("OPENING" + " " + System.currentTimeMillis()); buffer.setStopped(false); buffer.resetStopTicks(); } else { @@ -141,11 +143,9 @@ public void execute(Tuple tuple) { if(!justOpened) if(op.getEvictionPolicy() == Policy.TIME && !buffer.isStopped()) { if(buffer.getEvictionTicks() == op.getEvictionThreshold()) { - System.out.println("IN HERE!"); activateOpenPolicy(buffer, op); } else { buffer.incrementEvictionTicks(); - System.out.println("EVICT TICKS: " + buffer.getEvictionTicks() + " - " + System.currentTimeMillis()); } } } @@ -174,22 +174,16 @@ public void execute(Tuple tuple) { * The hashKey was added to the "fieldsGrouping" in an attempt to share pointers where possible. Different * rules with like fields groupings can store the items in their windows on the same node. */ - String flowId = tuple.getStringByField(FLOW_ID); - String hash = tuple.getStringByField(PARTITION); - Event event = (Event) tuple.getValueByField(EVENT); - int idx = tuple.getIntegerByField(FLOW_OP_IDX); - String streamName = tuple.getStringByField(STREAM_NAME); - String previousStream = tuple.getStringByField(LAST_STREAM); - idx++; + FlowInfo flowInfo = new FlowInfo(tuple); - Flow flow = flowMap.get(flowId); + Flow flow = flowMap.get(flowInfo.getFlowId()); - SwitchOp op = (SwitchOp) flow.getStream(streamName).getFlowOps().get(idx); + SwitchOp op = (SwitchOp) flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx()); - Cache buffersForRule = windows.get(flow.getId() + "\0" + streamName + "\0" + idx); + Cache buffersForRule = windows.get(flow.getId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx()); SwitchWindow buffer; if (buffersForRule != null) { - buffer = buffersForRule.getIfPresent(hash); + buffer = buffersForRule.getIfPresent(flowInfo.getPartition()); if (buffer != null) { // if we have a buffer already, process it @@ -201,16 +195,16 @@ public void execute(Tuple tuple) { buffer.timeEvict(op.getEvictionThreshold()); } } else { - buffer = op.getEvictionPolicy() == Policy.TIME ? new SwitchWindow(hash) : - new SwitchWindow(hash, op.getEvictionThreshold()); - buffersForRule.put(hash, buffer); + buffer = op.getEvictionPolicy() == Policy.TIME ? new SwitchWindow(flowInfo.getPartition()) : + new SwitchWindow(flowInfo.getPartition(), op.getEvictionThreshold()); + buffersForRule.put(flowInfo.getPartition(), buffer); } } else { buffersForRule = CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.MINUTES).build(); // just in case we get some rogue data, we don't wan ti to sit for too long. - buffer = op.getEvictionPolicy() == Policy.TIME ? new SwitchWindow(hash) : - new SwitchWindow(hash, op.getEvictionThreshold()); - buffersForRule.put(hash, buffer); - windows.put(flow.getId() + "\0" + streamName + "\0" + idx, buffersForRule); + buffer = op.getEvictionPolicy() == Policy.TIME ? new SwitchWindow(flowInfo.getPartition()) : + new SwitchWindow(flowInfo.getPartition(), op.getEvictionThreshold()); + buffersForRule.put(flowInfo.getPartition(), buffer); + windows.put(flow.getId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx(), buffersForRule); } if(buffer.isStopped()) { @@ -236,19 +230,8 @@ public void execute(Tuple tuple) { } if(!buffer.isStopped()) { - buffer.add(event, previousStream); - String nextStream = idx+1 < flow.getStream(streamName).getFlowOps().size() ? flow.getStream(streamName).getFlowOps().get(idx + 1).getComponentName() : "output"; - - if((nextStream.equals("output") && flow.getStream(streamName).isStdOutput()) || !nextStream.equals("output")) - collector.emit(nextStream, new Values(flow.getId(), event, idx, streamName, previousStream)); - - // send directly to any non std output streams - if(nextStream.equals("output") && flow.getStream(streamName).getOutputs() != null) { - for (String output : flow.getStream(streamName).getOutputs()) { - String outputStream = flow.getStream(output).getFlowOps().get(0).getComponentName(); - collector.emit(outputStream, tuple, new Values(flowId, event, -1, output, streamName)); - } - } + buffer.add(flowInfo.getEvent(), flowInfo.getPreviousStream()); + Utils.emitNext(tuple, flowInfo, flow, collector); } } @@ -274,7 +257,6 @@ private void activateOpenPolicy(SwitchWindow buffer, SwitchOp op) { if(op.getOpenPolicy() == Policy.TIME_DELTA_LT && buffer.timeRange() > -1 && buffer.timeRange() <= op.getOpenThreshold() * 1000) { if(isWindowFull(op, buffer)) { - System.out.println("CLOSING " + System.currentTimeMillis()); buffer.setStopped(true); buffer.clear(); buffer.resetEvictionTicks(); diff --git a/src/main/java/org/calrissian/flowmix/filter/CriteriaFilter.java b/src/main/java/org/calrissian/flowmix/filter/CriteriaFilter.java index 4284608..2dd8ff3 100644 --- a/src/main/java/org/calrissian/flowmix/filter/CriteriaFilter.java +++ b/src/main/java/org/calrissian/flowmix/filter/CriteriaFilter.java @@ -6,6 +6,9 @@ import static com.google.common.base.Preconditions.checkNotNull; +/** + * A filter that only allows events matching the criteria to pass through + */ public class CriteriaFilter implements Filter { Criteria criteria; diff --git a/src/main/java/org/calrissian/flowmix/filter/AllPassFilter.java b/src/main/java/org/calrissian/flowmix/filter/NoFilter.java similarity index 66% rename from src/main/java/org/calrissian/flowmix/filter/AllPassFilter.java rename to src/main/java/org/calrissian/flowmix/filter/NoFilter.java index c157c31..f954dbb 100644 --- a/src/main/java/org/calrissian/flowmix/filter/AllPassFilter.java +++ b/src/main/java/org/calrissian/flowmix/filter/NoFilter.java @@ -3,7 +3,10 @@ import org.calrissian.flowmix.support.Filter; import org.calrissian.mango.domain.event.Event; -public class AllPassFilter implements Filter { +/** + * A filter that accepts everything passed through it + */ +public class NoFilter implements Filter { @Override public boolean accept(Event event) { return true; } diff --git a/src/main/java/org/calrissian/flowmix/model/FlowInfo.java b/src/main/java/org/calrissian/flowmix/model/FlowInfo.java new file mode 100644 index 0000000..af279f3 --- /dev/null +++ b/src/main/java/org/calrissian/flowmix/model/FlowInfo.java @@ -0,0 +1,63 @@ +package org.calrissian.flowmix.model; + +import backtype.storm.tuple.Tuple; +import org.calrissian.mango.domain.event.Event; + +import static org.calrissian.flowmix.Constants.EVENT; +import static org.calrissian.flowmix.Constants.FLOW_ID; +import static org.calrissian.flowmix.Constants.FLOW_OP_IDX; +import static org.calrissian.flowmix.Constants.LAST_STREAM; +import static org.calrissian.flowmix.Constants.PARTITION; +import static org.calrissian.flowmix.Constants.STREAM_NAME; + +public class FlowInfo { + + private String flowId; + private Event event; + private int idx; + private String streamName; + private String previousStream; + private String partition; + + public FlowInfo(Tuple tuple) { + flowId = tuple.getStringByField(FLOW_ID); + event = (Event) tuple.getValueByField(EVENT); + idx = tuple.getIntegerByField(FLOW_OP_IDX); + idx++; + streamName = tuple.getStringByField(STREAM_NAME); + previousStream = tuple.getStringByField(LAST_STREAM); + + if(tuple.contains(PARTITION)) + partition = tuple.getStringByField(PARTITION); + } + + public FlowInfo(String flowId, String stream, int idx) { + this.flowId = flowId; + this.streamName = stream; + this.idx = idx; + } + + public String getFlowId() { + return flowId; + } + + public Event getEvent() { + return event; + } + + public int getIdx() { + return idx; + } + + public String getStreamName() { + return streamName; + } + + public String getPreviousStream() { + return previousStream; + } + + public String getPartition() { + return partition; + } +} diff --git a/src/main/java/org/calrissian/flowmix/model/builder/SplitBuilder.java b/src/main/java/org/calrissian/flowmix/model/builder/SplitBuilder.java index 2c94589..560a61e 100644 --- a/src/main/java/org/calrissian/flowmix/model/builder/SplitBuilder.java +++ b/src/main/java/org/calrissian/flowmix/model/builder/SplitBuilder.java @@ -1,6 +1,6 @@ package org.calrissian.flowmix.model.builder; -import org.calrissian.flowmix.filter.AllPassFilter; +import org.calrissian.flowmix.filter.NoFilter; import org.calrissian.flowmix.model.op.SplitOp; import org.calrissian.flowmix.support.Filter; import org.calrissian.mango.domain.Pair; @@ -28,7 +28,7 @@ public SplitBuilder path(Filter filter, String destinationStream) { } public SplitBuilder all(String destinationStream) { - this.paths.add(new Pair(new AllPassFilter(), destinationStream)); + this.paths.add(new Pair(new NoFilter(), destinationStream)); return this; } diff --git a/src/main/java/org/calrissian/flowmix/support/Utils.java b/src/main/java/org/calrissian/flowmix/support/Utils.java index 5d22138..9cc2246 100644 --- a/src/main/java/org/calrissian/flowmix/support/Utils.java +++ b/src/main/java/org/calrissian/flowmix/support/Utils.java @@ -23,6 +23,10 @@ import java.util.SortedSet; import java.util.TreeSet; +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Values; +import org.calrissian.flowmix.model.Flow; +import org.calrissian.flowmix.model.FlowInfo; import org.calrissian.mango.domain.Tuple; import org.calrissian.mango.domain.event.Event; import org.calrissian.mango.types.TypeRegistry; @@ -63,15 +67,36 @@ public static String buildKeyIndexForEvent(Event event, List groupBy) { public static String buildKeyIndexForEvent(String flowId, Event event, List groupBy) { - return flowId + buildKeyIndexForEvent(event, groupBy); - } + return flowId + buildKeyIndexForEvent(event, groupBy); + } + + public static String hashString(String string) throws NoSuchAlgorithmException, UnsupportedEncodingException { + MessageDigest md = MessageDigest.getInstance("MD5"); byte[] hash = md.digest(string.getBytes("UTF-8")); + //converting byte array to Hexadecimal String + StringBuilder sb = new StringBuilder(2*hash.length); + for(byte b : hash) + sb.append(String.format("%02x", b&0xff)); + return sb.toString(); + } + + public static String getNextStreamFromFlowInfo(FlowInfo flowInfo, Flow flow) { + return flowInfo.getIdx()+1 < flow.getStream(flowInfo.getStreamName()).getFlowOps().size() ? + flow.getStream(flowInfo.getStreamName()).getFlowOps().get(flowInfo.getIdx() + 1).getComponentName() : "output"; + } - public static String hashString(String string) throws NoSuchAlgorithmException, UnsupportedEncodingException { - MessageDigest md = MessageDigest.getInstance("MD5"); byte[] hash = md.digest(string.getBytes("UTF-8")); - //converting byte array to Hexadecimal String - StringBuilder sb = new StringBuilder(2*hash.length); - for(byte b : hash) - sb.append(String.format("%02x", b&0xff)); - return sb.toString(); + public static void emitNext(backtype.storm.tuple.Tuple tuple, FlowInfo flowInfo, Flow flow, OutputCollector collector) { + String nextStream = getNextStreamFromFlowInfo(flowInfo, flow); + + if((nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).isStdOutput()) || !nextStream.equals("output")) + collector.emit(nextStream, tuple, new Values(flow.getId(), flowInfo.getEvent(), flowInfo.getIdx(), flowInfo.getStreamName(), flowInfo.getPreviousStream())); + + // send directly to any non std output streams + if(nextStream.equals("output") && flow.getStream(flowInfo.getStreamName()).getOutputs() != null) { + for (String output : flow.getStream(flowInfo.getStreamName()).getOutputs()) { + String outputStream = flow.getStream(output).getFlowOps().get(0).getComponentName(); + collector.emit(outputStream, tuple, new Values(flowInfo.getFlowId(), flowInfo.getEvent(), -1, output, flowInfo.getStreamName())); + } } + } + }