Skip to content

Commit

Permalink
Region group status refactor (#14738) (#14760)
Browse files Browse the repository at this point in the history
* add isStrictConsensus

* add Test

* fix IT

* resolve comments

* rename

(cherry picked from commit cb4c05c)
  • Loading branch information
HxpSerein authored Jan 23, 2025
1 parent 1c1fa79 commit 99e4df4
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.iotdb.confignode.conf;

import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.conf.IoTDBConstant;
Expand Down Expand Up @@ -1197,4 +1199,11 @@ public TConfigNodeLocation generateLocalConfigNodeLocation() {
new TEndPoint(getInternalAddress(), getInternalPort()),
new TEndPoint(getInternalAddress(), getConsensusPort()));
}

public boolean isConsensusGroupStrongConsistency(TConsensusGroupId regionGroupId) {
return (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
&& getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
|| (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
&& getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupCache;
Expand Down Expand Up @@ -76,6 +78,8 @@ public class LoadCache {
ProcedureManager.PROCEDURE_WAIT_TIME_OUT - TimeUnit.SECONDS.toMillis(2),
TimeUnit.SECONDS.toMillis(10));

private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();

// Map<NodeId, is heartbeat processing>
// False indicates there is no processing heartbeat request, true otherwise
private final Map<Integer, AtomicBoolean> heartbeatProcessingMap;
Expand Down Expand Up @@ -164,13 +168,16 @@ private void initRegionGroupHeartbeatCache(
regionReplicaSets.forEach(
regionReplicaSet -> {
TConsensusGroupId regionGroupId = regionReplicaSet.getRegionId();
boolean isStrongConsistency =
CONF.isConsensusGroupStrongConsistency(regionGroupId);
regionGroupCacheMap.put(
regionGroupId,
new RegionGroupCache(
database,
regionReplicaSet.getDataNodeLocations().stream()
.map(TDataNodeLocation::getDataNodeId)
.collect(Collectors.toSet())));
.collect(Collectors.toSet()),
isStrongConsistency));
consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache());
}));
}
Expand Down Expand Up @@ -277,7 +284,9 @@ public void removeNodeCache(int nodeId) {
*/
public void createRegionGroupHeartbeatCache(
String database, TConsensusGroupId regionGroupId, Set<Integer> dataNodeIds) {
regionGroupCacheMap.put(regionGroupId, new RegionGroupCache(database, dataNodeIds));
boolean isStrongConsistency = CONF.isConsensusGroupStrongConsistency(regionGroupId);
regionGroupCacheMap.put(
regionGroupId, new RegionGroupCache(database, dataNodeIds, isStrongConsistency));
consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ public class RegionGroupCache {
private final Map<Integer, RegionCache> regionCacheMap;
// The current RegionGroupStatistics, used for providing statistics to other services
private final AtomicReference<RegionGroupStatistics> currentStatistics;
private final boolean isStrongConsistency;

/** Constructor for create RegionGroupCache with default RegionGroupStatistics. */
public RegionGroupCache(String database, Set<Integer> dataNodeIds) {
public RegionGroupCache(String database, Set<Integer> dataNodeIds, boolean isStrongConsistency) {
this.database = database;
this.regionCacheMap = new ConcurrentHashMap<>();
dataNodeIds.forEach(dataNodeId -> regionCacheMap.put(dataNodeId, new RegionCache()));
this.currentStatistics =
new AtomicReference<>(RegionGroupStatistics.generateDefaultRegionGroupStatistics());
this.isStrongConsistency = isStrongConsistency;
}

/**
Expand Down Expand Up @@ -108,34 +110,31 @@ public void updateCurrentStatistics() {

private RegionGroupStatus caculateRegionGroupStatus(
Map<Integer, RegionStatistics> regionStatisticsMap) {
int unknownCount = 0;
int readonlyCount = 0;

int runningCount = 0;
int addingCount = 0;
int removingCount = 0;
for (RegionStatistics regionStatistics : regionStatisticsMap.values()) {
unknownCount += RegionStatus.Unknown.equals(regionStatistics.getRegionStatus()) ? 1 : 0;
readonlyCount += RegionStatus.ReadOnly.equals(regionStatistics.getRegionStatus()) ? 1 : 0;
runningCount += RegionStatus.Running.equals(regionStatistics.getRegionStatus()) ? 1 : 0;
addingCount += RegionStatus.Adding.equals(regionStatistics.getRegionStatus()) ? 1 : 0;
removingCount += RegionStatus.Removing.equals(regionStatistics.getRegionStatus()) ? 1 : 0;
}
int baseCount = regionCacheMap.size() - addingCount - removingCount;

if (unknownCount + readonlyCount + removingCount == 0) {
// The RegionGroup is considered as Running only if
// all Regions are in the Running status
if (runningCount == baseCount) {
// The RegionGroup is considered as Running only if all Regions are in the Running status.
return RegionGroupStatus.Running;
} else if (readonlyCount == 0) {
return (unknownCount + removingCount) <= ((regionCacheMap.size() - 1) / 2)
// The RegionGroup is considered as Available when the number of Unknown Regions is less
// than half
}
if (isStrongConsistency) {
// For strong consistency algorithms, the RegionGroup is considered as Available when the
// number of Regions in the Running status is greater than half.
return runningCount > (baseCount / 2)
? RegionGroupStatus.Available
// Disabled otherwise
: RegionGroupStatus.Disabled;
} else {
return (unknownCount + readonlyCount + removingCount) <= ((regionCacheMap.size() - 1) / 2)
// The RegionGroup is considered as Discouraged when the number of Unknown or ReadOnly
// Regions is less
// than half, and there are at least 1 ReadOnly Region
? RegionGroupStatus.Discouraged
// Disabled otherwise
: RegionGroupStatus.Disabled;
// For weak consistency algorithms, the RegionGroup is considered as Available when the number
// of Regions in the Running status is greater than or equal to 1.
return (runningCount >= 1) ? RegionGroupStatus.Available : RegionGroupStatus.Disabled;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,19 @@ public enum RegionGroupStatus {
Running("Running", 1),

/**
* All Regions in RegionGroup are in the Running or Unknown or Removing status, and the number of
* Regions in the Unknown or Removing status is less than half
* For strong consistency algorithms, the RegionGroup is considered as Available when the number
* of Regions in the Running status is greater than half. For weak consistency algorithms, the
* RegionGroup is considered as Available when the number of Regions in the Running status is
* greater than or equal to 1. To avoid the impact of Removing and Adding region status on region
* group status evaluation, this status, which only occurs during region migration and
* reconstruction, can be excluded. The denominator uses the number of regions excluding Removing
* and Adding status, while the numerator uses regions in the Running status, ensuring high
* availability evaluation remains unaffected.
*/
Available("Available", 2),

/**
* All Regions in RegionGroup are in the Running, Unknown or ReadOnly or Removing status, and at
* least 1 node is in ReadOnly status, the number of Regions in the Unknown or ReadOnly or
* Removing status is less than half
*/
Discouraged("Discouraged", 3),

/**
* The following cases will lead to Disabled RegionGroup:
*
* <p>1. More than half of the Regions are in Unknown or ReadOnly or Removing status
*/
Disabled("Disabled", 4);
/** In scenarios other than the two mentioned above. */
Disabled("Disabled", 3);

private final String status;
private final int weight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void testRegionGroupCache() throws InterruptedException {
Assert.assertEquals(
new Pair<>(
new RegionGroupStatistics(RegionGroupStatus.Running, allRunningRegionStatisticsMap),
new RegionGroupStatistics(RegionGroupStatus.Available, oneRemovingRegionStatisticsMap)),
new RegionGroupStatistics(RegionGroupStatus.Running, oneRemovingRegionStatisticsMap)),
differentRegionGroupStatisticsMap.get(regionGroupId));
// Add and mark Region 3 as Adding
int addDataNodeId = 3;
Expand All @@ -203,8 +203,8 @@ public void testRegionGroupCache() throws InterruptedException {
oneAddingRegionStatisticsMap.put(addDataNodeId, new RegionStatistics(RegionStatus.Adding));
Assert.assertEquals(
new Pair<>(
new RegionGroupStatistics(RegionGroupStatus.Available, oneRemovingRegionStatisticsMap),
new RegionGroupStatistics(RegionGroupStatus.Available, oneAddingRegionStatisticsMap)),
new RegionGroupStatistics(RegionGroupStatus.Running, oneRemovingRegionStatisticsMap),
new RegionGroupStatistics(RegionGroupStatus.Running, oneAddingRegionStatisticsMap)),
differentRegionGroupStatisticsMap.get(regionGroupId));
// Both Region 0 and 3 can't be updated
LOAD_CACHE.cacheRegionHeartbeatSample(
Expand All @@ -226,8 +226,8 @@ public void testRegionGroupCache() throws InterruptedException {
oneRemovingRegionStatisticsMap.put(addDataNodeId, new RegionStatistics(RegionStatus.Running));
Assert.assertEquals(
new Pair<>(
new RegionGroupStatistics(RegionGroupStatus.Available, oneAddingRegionStatisticsMap),
new RegionGroupStatistics(RegionGroupStatus.Available, oneRemovingRegionStatisticsMap)),
new RegionGroupStatistics(RegionGroupStatus.Running, oneAddingRegionStatisticsMap),
new RegionGroupStatistics(RegionGroupStatus.Running, oneRemovingRegionStatisticsMap)),
differentRegionGroupStatisticsMap.get(regionGroupId));
// Removing process completed
LOAD_MANAGER.removeRegionCache(regionGroupId, removeDataNodeId);
Expand All @@ -237,7 +237,7 @@ public void testRegionGroupCache() throws InterruptedException {
allRunningRegionStatisticsMap.put(addDataNodeId, new RegionStatistics(RegionStatus.Running));
Assert.assertEquals(
new Pair<>(
new RegionGroupStatistics(RegionGroupStatus.Available, oneRemovingRegionStatisticsMap),
new RegionGroupStatistics(RegionGroupStatus.Running, oneRemovingRegionStatisticsMap),
new RegionGroupStatistics(RegionGroupStatus.Running, allRunningRegionStatisticsMap)),
differentRegionGroupStatisticsMap.get(regionGroupId));
}
Expand Down
Loading

0 comments on commit 99e4df4

Please sign in to comment.