Skip to content

Commit

Permalink
Porting changes from STORM-3693
Browse files Browse the repository at this point in the history
  • Loading branch information
reiabreu committed Jan 11, 2025
1 parent 457194d commit 5f24acc
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions storm-client/src/jvm/org/apache/storm/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,19 @@ public void accept(Object event) {
int taskId = addressedTuple.getDest();

TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
String streamId = tuple.getSourceStreamId();
boolean isSpout = this instanceof SpoutExecutor;
if (isDebug) {
LOG.info("Processing received TUPLE: {} for TASK: {} ", tuple, taskId);
}

try {
if (taskId != AddressedTuple.BROADCAST_DEST) {
tupleActionFn(taskId, tuple);
} else if (isSpout && streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
//taskId is irrelevant here. Ensures pending.rotate() is called once per tick.
tupleActionFn(taskIds.get(0), tuple);

} else {
for (Integer t : taskIds) {
tupleActionFn(t, tuple);
Expand Down

0 comments on commit 5f24acc

Please sign in to comment.