Skip to content

Commit

Permalink
Refactoring changes proposed on STORM-3693 and adding an unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
reiabreu committed Jan 18, 2025
1 parent e596015 commit 9227922
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 13 deletions.
10 changes: 4 additions & 6 deletions storm-client/src/jvm/org/apache/storm/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,19 +285,17 @@ public void accept(Object event) {
int taskId = addressedTuple.getDest();

TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
String streamId = tuple.getSourceStreamId();
boolean isSpout = this instanceof SpoutExecutor;
if (isDebug) {
LOG.info("Processing received TUPLE: {} for TASK: {} ", tuple, taskId);
}

acceptTupleAction(taskId, tuple);
}

protected void acceptTupleAction(int taskId, TupleImpl tuple) {
try {
if (taskId != AddressedTuple.BROADCAST_DEST) {
tupleActionFn(taskId, tuple);
} else if (isSpout && streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
//taskId is irrelevant here. Ensures pending.rotate() is called once per tick.
tupleActionFn(taskIds.get(0), tuple);

} else {
for (Integer t : taskIds) {
tupleActionFn(t, tuple);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
Expand All @@ -17,6 +17,7 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.ICredentialsListener;
Expand Down Expand Up @@ -80,7 +81,7 @@ public SpoutExecutor(final WorkerState workerData, final List<Long> executorId,
this.emittedCount = new MutableLong(0);
this.emptyEmitStreak = new MutableLong(0);
this.stats = new SpoutExecutorStats(
ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
this.skippedMaxSpoutMs = workerData.getMetricRegistry().rateCounter("__skipped-max-spout-ms", componentId,
taskIds.get(0));
this.skippedInactiveMs = workerData.getMetricRegistry().rateCounter("__skipped-inactive-ms", componentId,
Expand Down Expand Up @@ -131,8 +132,8 @@ public void expire(Long key, TupleInfo tupleInfo) {
}
ISpout spoutObject = (ISpout) taskData.getTaskObject();
spoutOutputCollector = new SpoutOutputCollectorImpl(
spoutObject, this, taskData, emittedCount,
hasAckers, rand, hasEventLoggers, isDebug, pending);
spoutObject, this, taskData, emittedCount,
hasAckers, rand, hasEventLoggers, isDebug, pending);
SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
this.outputCollectors.add(outputCollector);

Expand Down Expand Up @@ -283,6 +284,29 @@ private void inactiveExecute() throws InterruptedException {
skippedInactiveMs.inc(Time.currentTimeMillis() - start);
}

@Override
protected void acceptTupleAction(int taskId, TupleImpl tuple) {

String streamId = tuple.getSourceStreamId();

try {
if (taskId != AddressedTuple.BROADCAST_DEST) {
tupleActionFn(taskId, tuple);
} else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
//taskId is irrelevant here. Ensures pending.rotate() is called once per tick.
tupleActionFn(taskIds.get(0), tuple);

} else {
for (Integer t : taskIds) {
tupleActionFn(t, tuple);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}

}

@Override
public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
String streamId = tuple.getSourceStreamId();
Expand Down Expand Up @@ -370,6 +394,6 @@ public int getSpoutRecvqCheckSkipCount() {

public long getThreadId() {
return threadId;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.apache.storm.executor;

import org.apache.storm.Constants;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.spout.SpoutExecutor;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.metrics2.RateCounter;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.Utils;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.ReflectionUtils;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;

public class SpoutExecutorTest {


@Test
public void testPendingTuplesRotateShouldBeCalledOnlyOnce() throws Exception {

RateCounter rateCounter = Mockito.mock(RateCounter.class);

StormMetricRegistry stormMetricRegistry = Mockito.mock(StormMetricRegistry.class);
Mockito.when(stormMetricRegistry.rateCounter(anyString(),anyString(),anyInt())).thenReturn(rateCounter);

Map<String,Object> hashmap = Utils.readDefaultConfig();

IStateStorage stateStorage = Mockito.mock(IStateStorage.class);

ComponentCommon componentCommon = Mockito.mock(ComponentCommon.class);
Mockito.when(componentCommon.get_json_conf()).thenReturn(null);

WorkerTopologyContext workerTopologyContext = Mockito.mock(WorkerTopologyContext.class);
Mockito.when(workerTopologyContext.getComponentId(anyInt())).thenReturn("1");
Mockito.when(workerTopologyContext.getComponentCommon(anyString())).thenReturn(componentCommon);

WorkerState workerState = Mockito.mock(WorkerState.class);
Mockito.when(workerState.getWorkerTopologyContext()).thenReturn(workerTopologyContext);
Mockito.when(workerState.getStateStorage()).thenReturn(stateStorage);
Mockito.when(workerState.getTopologyConf()).thenReturn(hashmap);
Mockito.when(workerState.getMetricRegistry()).thenReturn(stormMetricRegistry);

SpoutExecutor spoutExecutor = new SpoutExecutor(workerState,List.of(1L,5L),new HashMap<>());

TupleImpl tuple = Mockito.mock(TupleImpl.class);
Mockito.when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
AddressedTuple addressedTuple = Mockito.mock(AddressedTuple.class);
Mockito.when(addressedTuple.getDest()).thenReturn(AddressedTuple.BROADCAST_DEST);
Mockito.when(addressedTuple.getTuple()).thenReturn(tuple);

RotatingMap rotatingMap = Mockito.mock(RotatingMap.class);
Field fieldRotatingMap = ReflectionUtils
.findFields(SpoutExecutor.class, f -> f.getName().equals("pending"),
ReflectionUtils.HierarchyTraversalMode.TOP_DOWN)
.get(0);
fieldRotatingMap.setAccessible(true);
fieldRotatingMap.set(spoutExecutor, rotatingMap);

spoutExecutor.accept(addressedTuple);

Mockito.verify(rotatingMap,Mockito.times(1)).rotate();
}
}

0 comments on commit 9227922

Please sign in to comment.