Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Remove DataNode Test #13809

Merged
merged 11 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,30 +306,6 @@ public void generalTestWithAllOptions(
}
}

private void restartDataNodes(List<DataNodeWrapper> dataNodeWrappers) {
dataNodeWrappers.parallelStream()
.forEach(
nodeWrapper -> {
nodeWrapper.stop();
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.pollDelay(2, TimeUnit.SECONDS)
.until(() -> !nodeWrapper.isAlive());
LOGGER.info("Node {} stopped.", nodeWrapper.getId());
nodeWrapper.start();
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.pollDelay(2, TimeUnit.SECONDS)
.until(nodeWrapper::isAlive);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
LOGGER.info("Node {} restarted.", nodeWrapper.getId());
});
}

private void setConfigNodeKillPoints(
KeySetView<String, Boolean> killConfigNodeKeywords, Consumer<KillPointContext> action) {
EnvFactory.getEnv()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.itbase.exception.InconsistentDataException;
import org.apache.iotdb.rpc.TSStatusCode;

Expand Down Expand Up @@ -88,7 +89,8 @@ public void successTest(
final int configNodeNum,
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode)
final int dataRegionPerDataNode,
final boolean rejoinRemovedDataNode)
throws Exception {
testRemoveDataNode(
dataReplicateFactor,
Expand All @@ -97,7 +99,8 @@ public void successTest(
dataNodeNum,
removeDataNodeNum,
dataRegionPerDataNode,
true);
true,
rejoinRemovedDataNode);
}

public void failTest(
Expand All @@ -106,7 +109,8 @@ public void failTest(
final int configNodeNum,
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode)
final int dataRegionPerDataNode,
final boolean rejoinRemovedDataNode)
throws Exception {
testRemoveDataNode(
dataReplicateFactor,
Expand All @@ -115,7 +119,8 @@ public void failTest(
dataNodeNum,
removeDataNodeNum,
dataRegionPerDataNode,
false);
false,
rejoinRemovedDataNode);
}

public void testRemoveDataNode(
Expand All @@ -125,7 +130,8 @@ public void testRemoveDataNode(
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode,
final boolean expectRemoveSuccess)
final boolean expectRemoveSuccess,
final boolean rejoinRemovedDataNode)
throws Exception {
// Set up the environment
EnvFactory.getEnv()
Expand All @@ -147,6 +153,13 @@ public void testRemoveDataNode(

ResultSet result = statement.executeQuery(SHOW_REGIONS);
Map<Integer, Set<Integer>> regionMap = getRegionMap(result);
regionMap.forEach(
(key, valueSet) -> {
LOGGER.info("Key: {}, Value: {}", key, valueSet);
if (valueSet.size() != dataReplicateFactor) {
Assert.fail();
}
});

// Get all data nodes
result = statement.executeQuery(SHOW_DATANODES);
Expand All @@ -159,6 +172,11 @@ public void testRemoveDataNode(
final Set<Integer> removeDataNodes =
selectRemoveDataNodes(allDataNodeId, regionMap, removeDataNodeNum);

List<DataNodeWrapper> removeDataNodeWrappers =
removeDataNodes.stream()
.map(dataNodeId -> EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get())
.collect(Collectors.toList());

AtomicReference<SyncConfigNodeIServiceClient> clientRef = new AtomicReference<>(client);
List<TDataNodeLocation> removeDataNodeLocations =
clientRef
Expand Down Expand Up @@ -204,8 +222,46 @@ public void testRemoveDataNode(
}

LOGGER.info("Remove DataNodes success");
} catch (InconsistentDataException ignored) {

if (rejoinRemovedDataNode) {
try {
// Use sleep and restart to ensure that removeDataNodes restarts successfully
Thread.sleep(30000);
restartDataNodes(removeDataNodeWrappers);
LOGGER.info("RemoveDataNodes:{} rejoined successfully.", removeDataNodes);
OneSizeFitsQuorum marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
LOGGER.error("RemoveDataNodes rejoin failed.");
Assert.fail();
}
}
} catch (InconsistentDataException e) {
LOGGER.error("Unexpected error:", e);
}

try (final Connection connection = closeQuietly(EnvFactory.getEnv().getConnection());
final Statement statement = closeQuietly(connection.createStatement())) {

// Check the data region distribution after removing data nodes
ResultSet result = statement.executeQuery(SHOW_REGIONS);
Map<Integer, Set<Integer>> afterRegionMap = getRegionMap(result);
afterRegionMap.forEach(
(key, valueSet) -> {
LOGGER.info("Key: {}, Value: {}", key, valueSet);
if (valueSet.size() != dataReplicateFactor) {
Assert.fail();
}
});

if (rejoinRemovedDataNode) {
result = statement.executeQuery(SHOW_DATANODES);
Set<Integer> allDataNodeId = new HashSet<>();
while (result.next()) {
allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
}
Assert.assertEquals(allDataNodeId.size(), dataNodeNum);
}
} catch (InconsistentDataException e) {
LOGGER.error("Unexpected error:", e);
}
}

Expand Down Expand Up @@ -285,4 +341,28 @@ private static void awaitUntilSuccess(

LOGGER.info("DataNodes has been successfully changed to {}", lastTimeDataNodeLocations.get());
}

public void restartDataNodes(List<DataNodeWrapper> dataNodeWrappers) {
dataNodeWrappers.parallelStream()
.forEach(
nodeWrapper -> {
nodeWrapper.stopForcibly();
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.pollDelay(2, TimeUnit.SECONDS)
.until(() -> !nodeWrapper.isAlive());
LOGGER.info("Node {} stopped.", nodeWrapper.getId());
nodeWrapper.start();
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.pollDelay(2, TimeUnit.SECONDS)
.until(nodeWrapper::isAlive);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
LOGGER.info("Node {} restarted.", nodeWrapper.getId());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
public class IoTDBRemoveDataNodeNormalIT extends IoTDBRemoveDataNodeITFramework {
@Test
public void success1C4DTest() throws Exception {
successTest(2, 3, 1, 4, 1, 2);
successTest(2, 3, 1, 4, 1, 2, true);
}

@Test
public void fail1C3DTest() throws Exception {
failTest(2, 3, 1, 3, 1, 2);
failTest(2, 3, 1, 3, 1, 2, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public enum CnToDnAsyncRequestType {
// Node Maintenance
STOP_DATA_NODE,
STOP_AND_CLEAR_DATA_NODE,
CLEAN_DATA_NODE_CACHE,
FLUSH,
MERGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,9 @@ protected void initActionMapBuilder() {
client.cleanDataNodeCache(
(TCleanDataNodeCacheReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.STOP_DATA_NODE,
(req, client, handler) -> client.stopDataNode((DataNodeTSStatusRPCHandler) handler));
CnToDnAsyncRequestType.STOP_AND_CLEAR_DATA_NODE,
(req, client, handler) ->
client.stopAndClearDataNode((DataNodeTSStatusRPCHandler) handler));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public static DataNodeAsyncRequestRPCHandler<?> buildHandler(
case CLEAR_CACHE:
case INVALIDATE_LAST_CACHE:
case CLEAN_DATA_NODE_CACHE:
case STOP_DATA_NODE:
case STOP_AND_CLEAR_DATA_NODE:
case START_REPAIR_DATA:
case STOP_REPAIR_DATA:
case LOAD_CONFIGURATION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public enum CnToDnSyncRequestType {
// Node Maintenance
CLEAN_DATA_NODE_CACHE,
STOP_DATA_NODE,
STOP_AND_CLEAR_DATA_NODE,
SET_SYSTEM_STATUS,
SHOW_CONFIGURATION,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ private void buildActionMap() {
CnToDnSyncRequestType.CLEAN_DATA_NODE_CACHE,
(req, client) -> client.cleanDataNodeCache((TCleanDataNodeCacheReq) req));
actionMapBuilder.put(
CnToDnSyncRequestType.STOP_DATA_NODE, (req, client) -> client.stopDataNode());
CnToDnSyncRequestType.STOP_AND_CLEAR_DATA_NODE,
(req, client) -> client.stopAndClearDataNode());
actionMapBuilder.put(
CnToDnSyncRequestType.SET_SYSTEM_STATUS,
(req, client) -> client.setSystemStatus((String) req));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ public boolean removeAINode(RemoveAINodePlan removeAINodePlan) {
public TSStatus checkRemoveDataNodes(List<TDataNodeLocation> dataNodeLocations) {
// 1. Only one RemoveDataNodesProcedure is allowed in the cluster
Optional<Procedure<ConfigNodeProcedureEnv>> anotherRemoveProcedure =
this.executor.getProcedures().values().stream()
getExecutor().getProcedures().values().stream()
.filter(
procedure -> {
if (procedure instanceof RemoveDataNodesProcedure) {
Expand All @@ -638,11 +638,10 @@ public TSStatus checkRemoveDataNodes(List<TDataNodeLocation> dataNodeLocations)
}

// 2. Check if the RemoveDataNodesProcedure conflicts with the RegionMigrateProcedure
RemoveDataNodeHandler removeDataNodeHandler = env.getRemoveDataNodeManager();
Set<TConsensusGroupId> removedDataNodesRegionSet =
removeDataNodeHandler.getRemovedDataNodesRegionSet(dataNodeLocations);
getEnv().getRemoveDataNodeHandler().getRemovedDataNodesRegionSet(dataNodeLocations);
Optional<Procedure<ConfigNodeProcedureEnv>> conflictRegionMigrateProcedure =
this.executor.getProcedures().values().stream()
getExecutor().getProcedures().values().stream()
.filter(
procedure -> {
if (procedure instanceof RegionMigrateProcedure) {
Expand Down Expand Up @@ -673,7 +672,7 @@ public TSStatus checkRemoveDataNodes(List<TDataNodeLocation> dataNodeLocations)
// 3. Check if the RegionMigrateProcedure generated by RemoveDataNodesProcedure conflicts with
// each other
List<RegionMigrationPlan> regionMigrationPlans =
removeDataNodeHandler.getRegionMigrationPlans(dataNodeLocations);
getEnv().getRemoveDataNodeHandler().getRegionMigrationPlans(dataNodeLocations);
removedDataNodesRegionSet.clear();
for (RegionMigrationPlan regionMigrationPlan : regionMigrationPlans) {
if (removedDataNodesRegionSet.contains(regionMigrationPlan.getRegionId())) {
Expand Down Expand Up @@ -716,7 +715,7 @@ private TSStatus checkRegionMigrate(
String failMessage = null;
// 1. Check if the RegionMigrateProcedure has conflict with another RegionMigrateProcedure
Optional<Procedure<ConfigNodeProcedureEnv>> anotherMigrateProcedure =
this.executor.getProcedures().values().stream()
getExecutor().getProcedures().values().stream()
.filter(
procedure -> {
if (procedure instanceof RegionMigrateProcedure) {
Expand Down Expand Up @@ -797,7 +796,7 @@ private TSStatus checkRegionMigrate(

// 2. Check if the RegionMigrateProcedure has conflict with RemoveDataNodesProcedure
Optional<Procedure<ConfigNodeProcedureEnv>> conflictRemoveDataNodesProcedure =
this.executor.getProcedures().values().stream()
getExecutor().getProcedures().values().stream()
.filter(
procedure -> {
if (procedure instanceof RemoveDataNodesProcedure) {
Expand All @@ -808,7 +807,7 @@ private TSStatus checkRegionMigrate(
.findAny();

if (conflictRemoveDataNodesProcedure.isPresent()) {
RemoveDataNodeHandler removeDataNodeHandler = env.getRemoveDataNodeManager();
RemoveDataNodeHandler removeDataNodeHandler = env.getRemoveDataNodeHandler();
List<TDataNodeLocation> removedDataNodes =
((RemoveDataNodesProcedure) conflictRemoveDataNodesProcedure.get()).getRemovedDataNodes();
Set<TConsensusGroupId> removedDataNodesRegionSet =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public void forceUpdateNodeCache(
default:
break;
}
loadCache.updateNodeStatistics();
loadCache.updateNodeStatistics(true);
eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
}

Expand All @@ -282,7 +282,7 @@ public void forceUpdateNodeCache(
*/
public void removeNodeCache(int nodeId) {
loadCache.removeNodeCache(nodeId);
loadCache.updateNodeStatistics();
loadCache.updateNodeStatistics(true);
eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected AbstractHeartbeatSample getLastSample() {
/**
* Update currentStatistics based on the latest heartbeat sample that cached in the slidingWindow.
*/
public abstract void updateCurrentStatistics();
public abstract void updateCurrentStatistics(boolean forceUpdate);

public AbstractStatistics getCurrentStatistics() {
return currentStatistics.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,10 @@ public void cacheConsensusSample(
}

/** Update the NodeStatistics of all Nodes. */
public void updateNodeStatistics() {
nodeCacheMap.values().forEach(BaseNodeCache::updateCurrentStatistics);
public void updateNodeStatistics(boolean forceUpdate) {
nodeCacheMap
.values()
.forEach(baseNodeCache -> baseNodeCache.updateCurrentStatistics(forceUpdate));
}

/** Update the RegionGroupStatistics of all RegionGroups. */
Expand All @@ -342,7 +344,9 @@ public void updateRegionGroupStatistics() {

/** Update the ConsensusGroupStatistics of all RegionGroups. */
public void updateConsensusGroupStatistics() {
consensusGroupCacheMap.values().forEach(ConsensusGroupCache::updateCurrentStatistics);
consensusGroupCacheMap
.values()
.forEach(consensusGroupCache -> consensusGroupCache.updateCurrentStatistics(false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ConsensusGroupCache() {
}

@Override
public synchronized void updateCurrentStatistics() {
public synchronized void updateCurrentStatistics(boolean forceUpdate) {
ConsensusGroupHeartbeatSample lastSample;
synchronized (slidingWindow) {
lastSample = (ConsensusGroupHeartbeatSample) getLastSample();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public AINodeHeartbeatCache(int aiNodeId) {
}

@Override
public void updateCurrentStatistics() {
public void updateCurrentStatistics(boolean forceUpdate) {
NodeHeartbeatSample lastSample = null;
synchronized (slidingWindow) {
if (!slidingWindow.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ConfigNodeHeartbeatCache(int configNodeId, NodeStatistics statistics) {
}

@Override
public synchronized void updateCurrentStatistics() {
public synchronized void updateCurrentStatistics(boolean forceUpdate) {
// Skip itself and the Removing status can not be updated
if (nodeId == CURRENT_NODE_ID || NodeStatus.Removing.equals(getNodeStatus())) {
return;
Expand Down
Loading
Loading