From 5f24acc489bfccc21ed11a0a49711e3e5cfacda0 Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Sat, 11 Jan 2025 16:10:15 +0000 Subject: [PATCH] Porting changes from STORM-3693 --- .../src/jvm/org/apache/storm/executor/Executor.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java index b3df9c9e631..2d287c77739 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java @@ -285,6 +285,8 @@ public void accept(Object event) { int taskId = addressedTuple.getDest(); TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); + String streamId = tuple.getSourceStreamId(); + boolean isSpout = this instanceof SpoutExecutor; if (isDebug) { LOG.info("Processing received TUPLE: {} for TASK: {} ", tuple, taskId); } @@ -292,6 +294,10 @@ public void accept(Object event) { try { if (taskId != AddressedTuple.BROADCAST_DEST) { tupleActionFn(taskId, tuple); + } else if (isSpout && streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) { + //taskId is irrelevant here. Ensures pending.rotate() is called once per tick. + tupleActionFn(taskIds.get(0), tuple); + } else { for (Integer t : taskIds) { tupleActionFn(t, tuple);