diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 2d287c77739..562c49102de 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -285,19 +285,17 @@ public void accept(Object event) {
int taskId = addressedTuple.getDest();
TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
- String streamId = tuple.getSourceStreamId();
- boolean isSpout = this instanceof SpoutExecutor;
if (isDebug) {
LOG.info("Processing received TUPLE: {} for TASK: {} ", tuple, taskId);
}
+ acceptTupleAction(taskId, tuple);
+ }
+
+ protected void acceptTupleAction(int taskId, TupleImpl tuple) {
try {
if (taskId != AddressedTuple.BROADCAST_DEST) {
tupleActionFn(taskId, tuple);
- } else if (isSpout && streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
- //taskId is irrelevant here. Ensures pending.rotate() is called once per tick.
- tupleActionFn(taskIds.get(0), tuple);
-
} else {
for (Integer t : taskIds) {
tupleActionFn(t, tuple);
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index 25c84474a49..a4081fe6c42 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -2,9 +2,9 @@
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
@@ -17,6 +17,7 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.ICredentialsListener;
@@ -80,7 +81,7 @@ public SpoutExecutor(final WorkerState workerData, final List executorId,
this.emittedCount = new MutableLong(0);
this.emptyEmitStreak = new MutableLong(0);
this.stats = new SpoutExecutorStats(
- ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
+ ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
this.skippedMaxSpoutMs = workerData.getMetricRegistry().rateCounter("__skipped-max-spout-ms", componentId,
taskIds.get(0));
this.skippedInactiveMs = workerData.getMetricRegistry().rateCounter("__skipped-inactive-ms", componentId,
@@ -131,8 +132,8 @@ public void expire(Long key, TupleInfo tupleInfo) {
}
ISpout spoutObject = (ISpout) taskData.getTaskObject();
spoutOutputCollector = new SpoutOutputCollectorImpl(
- spoutObject, this, taskData, emittedCount,
- hasAckers, rand, hasEventLoggers, isDebug, pending);
+ spoutObject, this, taskData, emittedCount,
+ hasAckers, rand, hasEventLoggers, isDebug, pending);
SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
this.outputCollectors.add(outputCollector);
@@ -283,6 +284,29 @@ private void inactiveExecute() throws InterruptedException {
skippedInactiveMs.inc(Time.currentTimeMillis() - start);
}
+ @Override
+ protected void acceptTupleAction(int taskId, TupleImpl tuple) {
+
+ String streamId = tuple.getSourceStreamId();
+
+ try {
+ if (taskId != AddressedTuple.BROADCAST_DEST) {
+ tupleActionFn(taskId, tuple);
+ } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+ //taskId is irrelevant here. Ensures pending.rotate() is called once per tick.
+ tupleActionFn(taskIds.get(0), tuple);
+
+ } else {
+ for (Integer t : taskIds) {
+ tupleActionFn(t, tuple);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
@Override
public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
String streamId = tuple.getSourceStreamId();
@@ -370,6 +394,6 @@ public int getSpoutRecvqCheckSkipCount() {
public long getThreadId() {
return threadId;
- }
-
+ }
+
}
diff --git a/storm-client/test/jvm/org/apache/storm/executor/SpoutExecutorTest.java b/storm-client/test/jvm/org/apache/storm/executor/SpoutExecutorTest.java
new file mode 100644
index 00000000000..d2871f6db10
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/executor/SpoutExecutorTest.java
@@ -0,0 +1,75 @@
+package org.apache.storm.executor;
+
+import org.apache.storm.Constants;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.metrics2.RateCounter;
+import org.apache.storm.metrics2.StormMetricRegistry;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.junit.platform.commons.util.ReflectionUtils;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+
+public class SpoutExecutorTest {
+
+
+ @Test
+ public void testPendingTuplesRotateShouldBeCalledOnlyOnce() throws Exception {
+
+ RateCounter rateCounter = Mockito.mock(RateCounter.class);
+
+ StormMetricRegistry stormMetricRegistry = Mockito.mock(StormMetricRegistry.class);
+ Mockito.when(stormMetricRegistry.rateCounter(anyString(),anyString(),anyInt())).thenReturn(rateCounter);
+
+ Map hashmap = Utils.readDefaultConfig();
+
+ IStateStorage stateStorage = Mockito.mock(IStateStorage.class);
+
+ ComponentCommon componentCommon = Mockito.mock(ComponentCommon.class);
+ Mockito.when(componentCommon.get_json_conf()).thenReturn(null);
+
+ WorkerTopologyContext workerTopologyContext = Mockito.mock(WorkerTopologyContext.class);
+ Mockito.when(workerTopologyContext.getComponentId(anyInt())).thenReturn("1");
+ Mockito.when(workerTopologyContext.getComponentCommon(anyString())).thenReturn(componentCommon);
+
+ WorkerState workerState = Mockito.mock(WorkerState.class);
+ Mockito.when(workerState.getWorkerTopologyContext()).thenReturn(workerTopologyContext);
+ Mockito.when(workerState.getStateStorage()).thenReturn(stateStorage);
+ Mockito.when(workerState.getTopologyConf()).thenReturn(hashmap);
+ Mockito.when(workerState.getMetricRegistry()).thenReturn(stormMetricRegistry);
+
+ SpoutExecutor spoutExecutor = new SpoutExecutor(workerState,List.of(1L,5L),new HashMap<>());
+
+ TupleImpl tuple = Mockito.mock(TupleImpl.class);
+ Mockito.when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
+ AddressedTuple addressedTuple = Mockito.mock(AddressedTuple.class);
+ Mockito.when(addressedTuple.getDest()).thenReturn(AddressedTuple.BROADCAST_DEST);
+ Mockito.when(addressedTuple.getTuple()).thenReturn(tuple);
+
+ RotatingMap rotatingMap = Mockito.mock(RotatingMap.class);
+ Field fieldRotatingMap = ReflectionUtils
+ .findFields(SpoutExecutor.class, f -> f.getName().equals("pending"),
+ ReflectionUtils.HierarchyTraversalMode.TOP_DOWN)
+ .get(0);
+ fieldRotatingMap.setAccessible(true);
+ fieldRotatingMap.set(spoutExecutor, rotatingMap);
+
+ spoutExecutor.accept(addressedTuple);
+
+ Mockito.verify(rotatingMap,Mockito.times(1)).rotate();
+ }
+}