Skip to content

Commit

Permalink
STORM-4154 - fix: Nimbus downtime during topology deployment (#3833)
Browse files Browse the repository at this point in the history
* fix: Nimbus downtime during topology deployment

* add tests for method
  • Loading branch information
DiogoP98 authored Jan 14, 2025
1 parent 6f86af4 commit af46999
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public static List<ACL> getNimbusAcls(Map<String, Object> conf) {
NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
NIMBUS_SUBJECT.setReadOnly();
}

private static final TopologyStateTransition NOOP_TRANSITION = (arg, nimbus, topoId, base) -> null;
private static final TopologyStateTransition INACTIVE_TRANSITION = (arg, nimbus, topoId, base) -> Nimbus.make(TopologyStatus.INACTIVE);
private static final TopologyStateTransition ACTIVE_TRANSITION = (arg, nimbus, topoId, base) -> Nimbus.make(TopologyStatus.ACTIVE);
Expand Down Expand Up @@ -3218,7 +3218,7 @@ private CommonTopoInfo getCommonTopoInfo(String topoId, String operation) throws
ret.allComponents = new HashSet<>(ret.taskToComponent.values());
return ret;
}

@VisibleForTesting
public boolean awaitLeadership(long timeout, TimeUnit timeUnit) throws InterruptedException {
return leaderElector.awaitLeadership(timeout, timeUnit);
Expand Down Expand Up @@ -4109,6 +4109,8 @@ public void createStateInZookeeper(String key) throws TException {
state.setupBlob(key, ni, getVersionForKey(key, ni, zkClient));
}
LOG.debug("Created state in zookeeper {} {} {}", state, store, ni);
} catch (KeyNotFoundException e) {
LOG.warn("Key not found while creating state in zookeeper - key: " + key, e);
} catch (Exception e) {
LOG.warn("Exception while creating state in zookeeper - key: " + key, e);
if (e instanceof TException) {
Expand Down Expand Up @@ -5313,7 +5315,7 @@ private static class CommonTopoInfo {
private static class ClusterSummaryMetrics implements MetricSet {
private static final String SUMMARY = "summary";
private final Map<String, com.codahale.metrics.Metric> metrics = new HashMap<>();

public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) {
return metrics.put(MetricRegistry.name(SUMMARY, key), value);
}
Expand All @@ -5323,12 +5325,12 @@ public Map<String, com.codahale.metrics.Metric> getMetrics() {
return metrics;
}
}

private class ClusterSummaryMetricSet implements Runnable {
private static final int CACHING_WINDOW = 5;

private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics();

private final Function<String, Histogram> registerHistogram = (name) -> {
//This histogram reflects the data distribution across only one ClusterSummary, i.e.,
// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,75 @@

import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.KeySequenceNumber;
import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.scheduler.INimbus;
import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategyOld;
import org.apache.storm.scheduler.resource.strategies.scheduling.RoundRobinResourceAwareStrategy;
import org.apache.storm.security.auth.IGroupMappingServiceProvider;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.thrift.TException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockitoAnnotations;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class NimbusTest {
private static final String BLOB_FILE_KEY = "file-key";

@Mock
private StormMetricsRegistry metricRegistry;
@Mock
private INimbus iNimbus;
@Mock
private IStormClusterState stormClusterState;
@Mock
private NimbusInfo nimbusInfo;
@Mock
private LocalFsBlobStore localBlobStore;
@Mock
private ILeaderElector leaderElector;
@Mock
private IGroupMappingServiceProvider groupMapper;

private Nimbus nimbus;

@BeforeEach
public void setUp() throws Exception {
MockitoAnnotations.openMocks(this).close();

Map<String, Object> conf = Map.of(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 10);
nimbus = new Nimbus(conf, iNimbus, stormClusterState, nimbusInfo, localBlobStore, leaderElector, groupMapper, metricRegistry);
}

public class NimbusTest {
@Test
public void testMemoryLoadLargerThanMaxHeapSize() {
// Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=128.0 < 129.0,
Expand Down Expand Up @@ -112,4 +163,41 @@ public void validateNoTopoConfOverrides() {
Map<String, Object> normalized = Nimbus.normalizeConf(conf, topoConf, topology);
assertNull(normalized.get(Config.STORM_WORKERS_ARTIFACTS_DIR));
}

@Test
void testCreateStateInZookeeper() throws TException {
nimbus.createStateInZookeeper(BLOB_FILE_KEY);

verify(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any());
}

@Test
void testCreateStateInZookeeperWithoutLocalFsBlobStoreInstanceShouldNotCreate() throws Exception {
BlobStore blobStore = mock(BlobStore.class);
Map<String, Object> conf = Map.of(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 10);
nimbus = new Nimbus(conf, iNimbus, stormClusterState, nimbusInfo, blobStore, leaderElector, groupMapper, metricRegistry);

nimbus.createStateInZookeeper(BLOB_FILE_KEY);

verify(stormClusterState, never()).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any());
}

@Test
void testCreateStateInZookeeperWhenFailToSetupBlobWithRuntimeExceptionThrowsRuntimeException() {
doThrow(new RuntimeException("Failed to setup blob")).when(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any());

assertThrows(RuntimeException.class, () -> nimbus.createStateInZookeeper(BLOB_FILE_KEY));
verify(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any());
}

@Test
void testCreateStateInZookeeperWhenKeyNotFoundHandlesException() throws Exception {
try (MockedConstruction<KeySequenceNumber> keySequenceNumber = mockConstruction(KeySequenceNumber.class, (mock, context) ->
when(mock.getKeySequenceNumber(any())).thenThrow(new KeyNotFoundException("Failed to setup blob")))) {
nimbus.createStateInZookeeper(BLOB_FILE_KEY);

verify(keySequenceNumber.constructed().get(0)).getKeySequenceNumber(any());
verify(stormClusterState, never()).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any());
}
}
}

0 comments on commit af46999

Please sign in to comment.