Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Remove DataNode Test #13809

Merged
merged 11 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.itbase.exception.InconsistentDataException;
import org.apache.iotdb.rpc.TSStatusCode;

Expand All @@ -50,7 +51,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework.closeQuietly;
import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework.getRegionMap;

public class IoTDBRemoveDataNodeITFramework {
Expand Down Expand Up @@ -88,7 +88,8 @@ public void successTest(
final int configNodeNum,
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode)
final int dataRegionPerDataNode,
final boolean rejoinRemovedDataNode)
throws Exception {
testRemoveDataNode(
dataReplicateFactor,
Expand All @@ -97,7 +98,8 @@ public void successTest(
dataNodeNum,
removeDataNodeNum,
dataRegionPerDataNode,
true);
true,
rejoinRemovedDataNode);
}

public void failTest(
Expand All @@ -106,7 +108,8 @@ public void failTest(
final int configNodeNum,
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode)
final int dataRegionPerDataNode,
final boolean rejoinRemovedDataNode)
throws Exception {
testRemoveDataNode(
dataReplicateFactor,
Expand All @@ -115,7 +118,8 @@ public void failTest(
dataNodeNum,
removeDataNodeNum,
dataRegionPerDataNode,
false);
false,
rejoinRemovedDataNode);
}

public void testRemoveDataNode(
Expand All @@ -125,7 +129,8 @@ public void testRemoveDataNode(
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode,
final boolean expectRemoveSuccess)
final boolean expectRemoveSuccess,
final boolean rejoinRemovedDataNode)
throws Exception {
// Set up the environment
EnvFactory.getEnv()
Expand All @@ -137,8 +142,8 @@ public void testRemoveDataNode(
dataRegionPerDataNode * dataNodeNum / dataReplicateFactor);
EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);

try (final Connection connection = closeQuietly(EnvFactory.getEnv().getConnection());
final Statement statement = closeQuietly(connection.createStatement());
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement();
SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {

Expand All @@ -159,6 +164,11 @@ public void testRemoveDataNode(
final Set<Integer> removeDataNodes =
selectRemoveDataNodes(allDataNodeId, regionMap, removeDataNodeNum);

List<DataNodeWrapper> removeDataNodeWrappers =
removeDataNodes.stream()
.map(dataNodeId -> EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get())
.collect(Collectors.toList());

AtomicReference<SyncConfigNodeIServiceClient> clientRef = new AtomicReference<>(client);
List<TDataNodeLocation> removeDataNodeLocations =
clientRef
Expand Down Expand Up @@ -204,6 +214,16 @@ public void testRemoveDataNode(
}

LOGGER.info("Remove DataNodes success");

if (rejoinRemovedDataNode) {
try {
removeDataNodeWrappers.parallelStream().forEach(DataNodeWrapper::start);
LOGGER.info("RemoveDataNodes:{} rejoined successfully.", removeDataNodes);
OneSizeFitsQuorum marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
LOGGER.error("RemoveDataNodes rejoin failed.");
Assert.fail();
}
}
} catch (InconsistentDataException ignored) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
public class IoTDBRemoveDataNodeNormalIT extends IoTDBRemoveDataNodeITFramework {
@Test
public void success1C4DTest() throws Exception {
successTest(2, 3, 1, 4, 1, 2);
successTest(2, 3, 1, 4, 1, 2, true);
}

@Test
public void fail1C3DTest() throws Exception {
failTest(2, 3, 1, 3, 1, 2);
failTest(2, 3, 1, 3, 1, 2, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackViewSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TStopDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
Expand Down Expand Up @@ -399,7 +400,8 @@ protected void initActionMapBuilder() {
(TCleanDataNodeCacheReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.STOP_DATA_NODE,
(req, client, handler) -> client.stopDataNode((DataNodeTSStatusRPCHandler) handler));
(req, client, handler) ->
client.stopDataNode((TStopDataNodeReq) req, (DataNodeTSStatusRPCHandler) handler));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq;
import org.apache.iotdb.mpp.rpc.thrift.TStopDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -104,7 +105,8 @@ private void buildActionMap() {
CnToDnSyncRequestType.CLEAN_DATA_NODE_CACHE,
(req, client) -> client.cleanDataNodeCache((TCleanDataNodeCacheReq) req));
actionMapBuilder.put(
CnToDnSyncRequestType.STOP_DATA_NODE, (req, client) -> client.stopDataNode());
CnToDnSyncRequestType.STOP_DATA_NODE,
(req, client) -> client.stopDataNode((TStopDataNodeReq) req));
actionMapBuilder.put(
CnToDnSyncRequestType.SET_SYSTEM_STATUS,
(req, client) -> client.setSystemStatus((String) req));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ public boolean removeAINode(RemoveAINodePlan removeAINodePlan) {
public TSStatus checkRemoveDataNodes(List<TDataNodeLocation> dataNodeLocations) {
// 1. Only one RemoveDataNodesProcedure is allowed in the cluster
Optional<Procedure<ConfigNodeProcedureEnv>> anotherRemoveProcedure =
this.executor.getProcedures().values().stream()
getExecutor().getProcedures().values().stream()
.filter(
procedure -> {
if (procedure instanceof RemoveDataNodesProcedure) {
Expand All @@ -638,11 +638,10 @@ public TSStatus checkRemoveDataNodes(List<TDataNodeLocation> dataNodeLocations)
}

// 2. Check if the RemoveDataNodesProcedure conflicts with the RegionMigrateProcedure
RemoveDataNodeHandler removeDataNodeHandler = env.getRemoveDataNodeManager();
Set<TConsensusGroupId> removedDataNodesRegionSet =
removeDataNodeHandler.getRemovedDataNodesRegionSet(dataNodeLocations);
getEnv().getRemoveDataNodeHandler().getRemovedDataNodesRegionSet(dataNodeLocations);
Optional<Procedure<ConfigNodeProcedureEnv>> conflictRegionMigrateProcedure =
this.executor.getProcedures().values().stream()
getExecutor().getProcedures().values().stream()
.filter(
procedure -> {
if (procedure instanceof RegionMigrateProcedure) {
Expand Down Expand Up @@ -673,7 +672,7 @@ public TSStatus checkRemoveDataNodes(List<TDataNodeLocation> dataNodeLocations)
// 3. Check if the RegionMigrateProcedure generated by RemoveDataNodesProcedure conflicts with
// each other
List<RegionMigrationPlan> regionMigrationPlans =
removeDataNodeHandler.getRegionMigrationPlans(dataNodeLocations);
getEnv().getRemoveDataNodeHandler().getRegionMigrationPlans(dataNodeLocations);
removedDataNodesRegionSet.clear();
for (RegionMigrationPlan regionMigrationPlan : regionMigrationPlans) {
if (removedDataNodesRegionSet.contains(regionMigrationPlan.getRegionId())) {
Expand Down Expand Up @@ -716,7 +715,7 @@ private TSStatus checkRegionMigrate(
String failMessage = null;
// 1. Check if the RegionMigrateProcedure has conflict with another RegionMigrateProcedure
Optional<Procedure<ConfigNodeProcedureEnv>> anotherMigrateProcedure =
this.executor.getProcedures().values().stream()
getExecutor().getProcedures().values().stream()
.filter(
procedure -> {
if (procedure instanceof RegionMigrateProcedure) {
Expand Down Expand Up @@ -797,7 +796,7 @@ private TSStatus checkRegionMigrate(

// 2. Check if the RegionMigrateProcedure has conflict with RemoveDataNodesProcedure
Optional<Procedure<ConfigNodeProcedureEnv>> conflictRemoveDataNodesProcedure =
this.executor.getProcedures().values().stream()
getExecutor().getProcedures().values().stream()
.filter(
procedure -> {
if (procedure instanceof RemoveDataNodesProcedure) {
Expand All @@ -808,7 +807,7 @@ private TSStatus checkRegionMigrate(
.findAny();

if (conflictRemoveDataNodesProcedure.isPresent()) {
RemoveDataNodeHandler removeDataNodeHandler = env.getRemoveDataNodeManager();
RemoveDataNodeHandler removeDataNodeHandler = env.getRemoveDataNodeHandler();
List<TDataNodeLocation> removedDataNodes =
((RemoveDataNodesProcedure) conflictRemoveDataNodesProcedure.get()).getRemovedDataNodes();
Set<TConsensusGroupId> removedDataNodesRegionSet =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public void forceUpdateNodeCache(
default:
break;
}
loadCache.updateNodeStatistics();
loadCache.updateNodeStatistics(true);
eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
}

Expand All @@ -282,7 +282,7 @@ public void forceUpdateNodeCache(
*/
public void removeNodeCache(int nodeId) {
loadCache.removeNodeCache(nodeId);
loadCache.updateNodeStatistics();
loadCache.updateNodeStatistics(false);
HxpSerein marked this conversation as resolved.
Show resolved Hide resolved
eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected AbstractHeartbeatSample getLastSample() {
/**
* Update currentStatistics based on the latest heartbeat sample that cached in the slidingWindow.
*/
public abstract void updateCurrentStatistics();
public abstract void updateCurrentStatistics(boolean forceUpdate);

public AbstractStatistics getCurrentStatistics() {
return currentStatistics.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,10 @@ public void cacheConsensusSample(
}

/** Update the NodeStatistics of all Nodes. */
public void updateNodeStatistics() {
nodeCacheMap.values().forEach(BaseNodeCache::updateCurrentStatistics);
public void updateNodeStatistics(boolean forceUpdate) {
nodeCacheMap
.values()
.forEach(baseNodeCache -> baseNodeCache.updateCurrentStatistics(forceUpdate));
}

/** Update the RegionGroupStatistics of all RegionGroups. */
Expand All @@ -342,7 +344,9 @@ public void updateRegionGroupStatistics() {

/** Update the ConsensusGroupStatistics of all RegionGroups. */
public void updateConsensusGroupStatistics() {
consensusGroupCacheMap.values().forEach(ConsensusGroupCache::updateCurrentStatistics);
consensusGroupCacheMap
.values()
.forEach(consensusGroupCache -> consensusGroupCache.updateCurrentStatistics(false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ConsensusGroupCache() {
}

@Override
public synchronized void updateCurrentStatistics() {
public synchronized void updateCurrentStatistics(boolean forceUpdate) {
ConsensusGroupHeartbeatSample lastSample;
synchronized (slidingWindow) {
lastSample = (ConsensusGroupHeartbeatSample) getLastSample();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public AINodeHeartbeatCache(int aiNodeId) {
}

@Override
public void updateCurrentStatistics() {
public void updateCurrentStatistics(boolean forceUpdate) {
NodeHeartbeatSample lastSample = null;
synchronized (slidingWindow) {
if (!slidingWindow.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ConfigNodeHeartbeatCache(int configNodeId, NodeStatistics statistics) {
}

@Override
public synchronized void updateCurrentStatistics() {
public synchronized void updateCurrentStatistics(boolean forceUpdate) {
// Skip itself and the Removing status can not be updated
if (nodeId == CURRENT_NODE_ID || NodeStatus.Removing.equals(getNodeStatus())) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public DataNodeHeartbeatCache(int dataNodeId) {
}

@Override
public synchronized void updateCurrentStatistics() {
public synchronized void updateCurrentStatistics(boolean forceUpdate) {
// The Removing status can not be updated
OneSizeFitsQuorum marked this conversation as resolved.
Show resolved Hide resolved
if (NodeStatus.Removing.equals(getNodeStatus())) {
if (!forceUpdate && NodeStatus.Removing.equals(getNodeStatus())) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public RegionCache() {
}

@Override
public synchronized void updateCurrentStatistics() {
public synchronized void updateCurrentStatistics(boolean forceUpdate) {
RegionHeartbeatSample lastSample;
synchronized (slidingWindow) {
lastSample = (RegionHeartbeatSample) getLastSample();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void removeRegionCache(int dataNodeId) {
* slidingWindow.
*/
public void updateCurrentStatistics() {
regionCacheMap.values().forEach(RegionCache::updateCurrentStatistics);
regionCacheMap.values().forEach(regionCache -> regionCache.updateCurrentStatistics(false));
Map<Integer, RegionStatistics> regionStatisticsMap =
regionCacheMap.entrySet().stream()
.collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void stopLoadStatisticsService() {
}

private void updateLoadStatistics() {
loadCache.updateNodeStatistics();
loadCache.updateNodeStatistics(false);
loadCache.updateRegionGroupStatistics();
loadCache.updateConsensusGroupStatistics();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
try {
// Checks if the RemoveDataNode request is valid
RemoveDataNodeHandler removeDataNodeHandler =
configManager.getProcedureManager().getEnv().getRemoveDataNodeManager();
configManager.getProcedureManager().getEnv().getRemoveDataNodeHandler();
DataNodeToStatusResp preCheckStatus =
removeDataNodeHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ public RegionMaintainHandler getRegionMaintainHandler() {
return regionMaintainHandler;
}

public RemoveDataNodeHandler getRemoveDataNodeManager() {
public RemoveDataNodeHandler getRemoveDataNodeHandler() {
return removeDataNodeHandler;
}

Expand Down
Loading
Loading