diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 49c0685669a7..d116e4aec5b3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -20,6 +20,8 @@ package org.apache.iotdb.confignode.conf; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty; import org.apache.iotdb.commons.conf.IoTDBConstant; @@ -1197,4 +1199,11 @@ public TConfigNodeLocation generateLocalConfigNodeLocation() { new TEndPoint(getInternalAddress(), getInternalPort()), new TEndPoint(getInternalAddress(), getConsensusPort())); } + + public boolean isConsensusGroupStrongConsistency(TConsensusGroupId regionGroupId) { + return (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) + && getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) + || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) + && getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 8e02468004f9..3796f7ac2b76 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -30,6 +30,8 @@ import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.cluster.NodeType; import org.apache.iotdb.commons.cluster.RegionStatus; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.ProcedureManager; import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupCache; @@ -76,6 +78,8 @@ public class LoadCache { ProcedureManager.PROCEDURE_WAIT_TIME_OUT - TimeUnit.SECONDS.toMillis(2), TimeUnit.SECONDS.toMillis(10)); + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + // Map // False indicates there is no processing heartbeat request, true otherwise private final Map heartbeatProcessingMap; @@ -164,13 +168,16 @@ private void initRegionGroupHeartbeatCache( regionReplicaSets.forEach( regionReplicaSet -> { TConsensusGroupId regionGroupId = regionReplicaSet.getRegionId(); + boolean isStrongConsistency = + CONF.isConsensusGroupStrongConsistency(regionGroupId); regionGroupCacheMap.put( regionGroupId, new RegionGroupCache( database, regionReplicaSet.getDataNodeLocations().stream() .map(TDataNodeLocation::getDataNodeId) - .collect(Collectors.toSet()))); + .collect(Collectors.toSet()), + isStrongConsistency)); consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache()); })); } @@ -277,7 +284,9 @@ public void removeNodeCache(int nodeId) { */ public void createRegionGroupHeartbeatCache( String database, TConsensusGroupId regionGroupId, Set dataNodeIds) { - regionGroupCacheMap.put(regionGroupId, new RegionGroupCache(database, dataNodeIds)); + boolean isStrongConsistency = CONF.isConsensusGroupStrongConsistency(regionGroupId); + regionGroupCacheMap.put( + regionGroupId, new RegionGroupCache(database, dataNodeIds, isStrongConsistency)); consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java index 61a4c5446581..0ee3f7d2cf85 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java @@ -42,14 +42,16 @@ public class RegionGroupCache { private final Map regionCacheMap; // The current RegionGroupStatistics, used for providing statistics to other services private final AtomicReference currentStatistics; + private final boolean isStrongConsistency; /** Constructor for create RegionGroupCache with default RegionGroupStatistics. */ - public RegionGroupCache(String database, Set dataNodeIds) { + public RegionGroupCache(String database, Set dataNodeIds, boolean isStrongConsistency) { this.database = database; this.regionCacheMap = new ConcurrentHashMap<>(); dataNodeIds.forEach(dataNodeId -> regionCacheMap.put(dataNodeId, new RegionCache())); this.currentStatistics = new AtomicReference<>(RegionGroupStatistics.generateDefaultRegionGroupStatistics()); + this.isStrongConsistency = isStrongConsistency; } /** @@ -108,34 +110,31 @@ public void updateCurrentStatistics() { private RegionGroupStatus caculateRegionGroupStatus( Map regionStatisticsMap) { - int unknownCount = 0; - int readonlyCount = 0; + + int runningCount = 0; + int addingCount = 0; int removingCount = 0; for (RegionStatistics regionStatistics : regionStatisticsMap.values()) { - unknownCount += RegionStatus.Unknown.equals(regionStatistics.getRegionStatus()) ? 1 : 0; - readonlyCount += RegionStatus.ReadOnly.equals(regionStatistics.getRegionStatus()) ? 1 : 0; + runningCount += RegionStatus.Running.equals(regionStatistics.getRegionStatus()) ? 1 : 0; + addingCount += RegionStatus.Adding.equals(regionStatistics.getRegionStatus()) ? 1 : 0; removingCount += RegionStatus.Removing.equals(regionStatistics.getRegionStatus()) ? 1 : 0; } + int baseCount = regionCacheMap.size() - addingCount - removingCount; - if (unknownCount + readonlyCount + removingCount == 0) { - // The RegionGroup is considered as Running only if - // all Regions are in the Running status + if (runningCount == baseCount) { + // The RegionGroup is considered as Running only if all Regions are in the Running status. return RegionGroupStatus.Running; - } else if (readonlyCount == 0) { - return (unknownCount + removingCount) <= ((regionCacheMap.size() - 1) / 2) - // The RegionGroup is considered as Available when the number of Unknown Regions is less - // than half + } + if (isStrongConsistency) { + // For strong consistency algorithms, the RegionGroup is considered as Available when the + // number of Regions in the Running status is greater than half. + return runningCount > (baseCount / 2) ? RegionGroupStatus.Available - // Disabled otherwise : RegionGroupStatus.Disabled; } else { - return (unknownCount + readonlyCount + removingCount) <= ((regionCacheMap.size() - 1) / 2) - // The RegionGroup is considered as Discouraged when the number of Unknown or ReadOnly - // Regions is less - // than half, and there are at least 1 ReadOnly Region - ? RegionGroupStatus.Discouraged - // Disabled otherwise - : RegionGroupStatus.Disabled; + // For weak consistency algorithms, the RegionGroup is considered as Available when the number + // of Regions in the Running status is greater than or equal to 1. + return (runningCount >= 1) ? RegionGroupStatus.Available : RegionGroupStatus.Disabled; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java index 1b1c395777ae..a2c4bea6a736 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java @@ -24,24 +24,19 @@ public enum RegionGroupStatus { Running("Running", 1), /** - * All Regions in RegionGroup are in the Running or Unknown or Removing status, and the number of - * Regions in the Unknown or Removing status is less than half + * For strong consistency algorithms, the RegionGroup is considered as Available when the number + * of Regions in the Running status is greater than half. For weak consistency algorithms, the + * RegionGroup is considered as Available when the number of Regions in the Running status is + * greater than or equal to 1. To avoid the impact of Removing and Adding region status on region + * group status evaluation, this status, which only occurs during region migration and + * reconstruction, can be excluded. The denominator uses the number of regions excluding Removing + * and Adding status, while the numerator uses regions in the Running status, ensuring high + * availability evaluation remains unaffected. */ Available("Available", 2), - /** - * All Regions in RegionGroup are in the Running, Unknown or ReadOnly or Removing status, and at - * least 1 node is in ReadOnly status, the number of Regions in the Unknown or ReadOnly or - * Removing status is less than half - */ - Discouraged("Discouraged", 3), - - /** - * The following cases will lead to Disabled RegionGroup: - * - *

1. More than half of the Regions are in Unknown or ReadOnly or Removing status - */ - Disabled("Disabled", 4); + /** In scenarios other than the two mentioned above. */ + Disabled("Disabled", 3); private final String status; private final int weight; diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java index afbdacfe26e7..fd62c31ae367 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java @@ -188,7 +188,7 @@ public void testRegionGroupCache() throws InterruptedException { Assert.assertEquals( new Pair<>( new RegionGroupStatistics(RegionGroupStatus.Running, allRunningRegionStatisticsMap), - new RegionGroupStatistics(RegionGroupStatus.Available, oneRemovingRegionStatisticsMap)), + new RegionGroupStatistics(RegionGroupStatus.Running, oneRemovingRegionStatisticsMap)), differentRegionGroupStatisticsMap.get(regionGroupId)); // Add and mark Region 3 as Adding int addDataNodeId = 3; @@ -203,8 +203,8 @@ public void testRegionGroupCache() throws InterruptedException { oneAddingRegionStatisticsMap.put(addDataNodeId, new RegionStatistics(RegionStatus.Adding)); Assert.assertEquals( new Pair<>( - new RegionGroupStatistics(RegionGroupStatus.Available, oneRemovingRegionStatisticsMap), - new RegionGroupStatistics(RegionGroupStatus.Available, oneAddingRegionStatisticsMap)), + new RegionGroupStatistics(RegionGroupStatus.Running, oneRemovingRegionStatisticsMap), + new RegionGroupStatistics(RegionGroupStatus.Running, oneAddingRegionStatisticsMap)), differentRegionGroupStatisticsMap.get(regionGroupId)); // Both Region 0 and 3 can't be updated LOAD_CACHE.cacheRegionHeartbeatSample( @@ -226,8 +226,8 @@ public void testRegionGroupCache() throws InterruptedException { oneRemovingRegionStatisticsMap.put(addDataNodeId, new RegionStatistics(RegionStatus.Running)); Assert.assertEquals( new Pair<>( - new RegionGroupStatistics(RegionGroupStatus.Available, oneAddingRegionStatisticsMap), - new RegionGroupStatistics(RegionGroupStatus.Available, oneRemovingRegionStatisticsMap)), + new RegionGroupStatistics(RegionGroupStatus.Running, oneAddingRegionStatisticsMap), + new RegionGroupStatistics(RegionGroupStatus.Running, oneRemovingRegionStatisticsMap)), differentRegionGroupStatisticsMap.get(regionGroupId)); // Removing process completed LOAD_MANAGER.removeRegionCache(regionGroupId, removeDataNodeId); @@ -237,7 +237,7 @@ public void testRegionGroupCache() throws InterruptedException { allRunningRegionStatisticsMap.put(addDataNodeId, new RegionStatistics(RegionStatus.Running)); Assert.assertEquals( new Pair<>( - new RegionGroupStatistics(RegionGroupStatus.Available, oneRemovingRegionStatisticsMap), + new RegionGroupStatistics(RegionGroupStatus.Running, oneRemovingRegionStatisticsMap), new RegionGroupStatistics(RegionGroupStatus.Running, allRunningRegionStatisticsMap)), differentRegionGroupStatisticsMap.get(regionGroupId)); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java index 71543903c57d..69f073aad8b5 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java @@ -37,7 +37,7 @@ public class RegionGroupCacheTest { public void getRegionStatusTest() { long currentTime = System.nanoTime(); RegionGroupCache regionGroupCache = - new RegionGroupCache(DATABASE, Stream.of(0, 1, 2, 3).collect(Collectors.toSet())); + new RegionGroupCache(DATABASE, Stream.of(0, 1, 2, 3, 4).collect(Collectors.toSet()), false); regionGroupCache.cacheHeartbeatSample( 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); regionGroupCache.cacheHeartbeatSample( @@ -46,6 +46,8 @@ public void getRegionStatusTest() { 2, new RegionHeartbeatSample(currentTime, RegionStatus.Removing)); regionGroupCache.cacheHeartbeatSample( 3, new RegionHeartbeatSample(currentTime, RegionStatus.ReadOnly)); + regionGroupCache.cacheHeartbeatSample( + 4, new RegionHeartbeatSample(currentTime, RegionStatus.Adding)); regionGroupCache.updateCurrentStatistics(); Assert.assertEquals( @@ -56,74 +58,110 @@ public void getRegionStatusTest() { RegionStatus.Removing, regionGroupCache.getCurrentStatistics().getRegionStatus(2)); Assert.assertEquals( RegionStatus.ReadOnly, regionGroupCache.getCurrentStatistics().getRegionStatus(3)); + Assert.assertEquals( + RegionStatus.Adding, regionGroupCache.getCurrentStatistics().getRegionStatus(4)); } @Test - public void getRegionGroupStatusTest() { + public void weakConsistencyRegionGroupStatusTest() { long currentTime = System.nanoTime(); - RegionGroupCache runningRegionGroup = - new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet())); - runningRegionGroup.cacheHeartbeatSample( + RegionGroupCache regionGroupCache = + new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet()), false); + regionGroupCache.cacheHeartbeatSample( 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); - runningRegionGroup.cacheHeartbeatSample( + regionGroupCache.cacheHeartbeatSample( 1, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); - runningRegionGroup.cacheHeartbeatSample( + regionGroupCache.cacheHeartbeatSample( 2, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); - runningRegionGroup.updateCurrentStatistics(); + regionGroupCache.updateCurrentStatistics(); Assert.assertEquals( - RegionGroupStatus.Running, - runningRegionGroup.getCurrentStatistics().getRegionGroupStatus()); + RegionGroupStatus.Running, regionGroupCache.getCurrentStatistics().getRegionGroupStatus()); - RegionGroupCache availableRegionGroup = - new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet())); - availableRegionGroup.cacheHeartbeatSample( - 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); - availableRegionGroup.cacheHeartbeatSample( + regionGroupCache.cacheHeartbeatSample( + 0, new RegionHeartbeatSample(currentTime, RegionStatus.Unknown)); + regionGroupCache.updateCurrentStatistics(); + Assert.assertEquals( + RegionGroupStatus.Available, + regionGroupCache.getCurrentStatistics().getRegionGroupStatus()); + + regionGroupCache.cacheHeartbeatSample( 1, new RegionHeartbeatSample(currentTime, RegionStatus.Unknown)); - availableRegionGroup.cacheHeartbeatSample( - 2, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); - availableRegionGroup.updateCurrentStatistics(); + regionGroupCache.updateCurrentStatistics(); Assert.assertEquals( RegionGroupStatus.Available, - availableRegionGroup.getCurrentStatistics().getRegionGroupStatus()); + regionGroupCache.getCurrentStatistics().getRegionGroupStatus()); - RegionGroupCache disabledRegionGroup0 = - new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet())); - disabledRegionGroup0.cacheHeartbeatSample( + regionGroupCache.cacheHeartbeatSample( + 2, new RegionHeartbeatSample(currentTime, RegionStatus.Unknown)); + regionGroupCache.updateCurrentStatistics(); + Assert.assertEquals( + RegionGroupStatus.Disabled, regionGroupCache.getCurrentStatistics().getRegionGroupStatus()); + } + + @Test + public void strongConsistencyRegionGroupStatusTest() { + long currentTime = System.nanoTime(); + RegionGroupCache regionGroupCache = + new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet()), true); + regionGroupCache.cacheHeartbeatSample( 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); - disabledRegionGroup0.cacheHeartbeatSample( - 1, new RegionHeartbeatSample(currentTime, RegionStatus.ReadOnly)); - disabledRegionGroup0.cacheHeartbeatSample( + regionGroupCache.cacheHeartbeatSample( + 1, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); + regionGroupCache.cacheHeartbeatSample( 2, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); - disabledRegionGroup0.updateCurrentStatistics(); + regionGroupCache.updateCurrentStatistics(); Assert.assertEquals( - RegionGroupStatus.Discouraged, - disabledRegionGroup0.getCurrentStatistics().getRegionGroupStatus()); + RegionGroupStatus.Running, regionGroupCache.getCurrentStatistics().getRegionGroupStatus()); - RegionGroupCache disabledRegionGroup1 = - new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet())); - disabledRegionGroup1.cacheHeartbeatSample( - 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); - disabledRegionGroup1.cacheHeartbeatSample( + regionGroupCache.cacheHeartbeatSample( + 0, new RegionHeartbeatSample(currentTime, RegionStatus.Unknown)); + regionGroupCache.updateCurrentStatistics(); + Assert.assertEquals( + RegionGroupStatus.Available, + regionGroupCache.getCurrentStatistics().getRegionGroupStatus()); + + regionGroupCache.cacheHeartbeatSample( 1, new RegionHeartbeatSample(currentTime, RegionStatus.Unknown)); - disabledRegionGroup1.cacheHeartbeatSample( + regionGroupCache.updateCurrentStatistics(); + Assert.assertEquals( + RegionGroupStatus.Disabled, regionGroupCache.getCurrentStatistics().getRegionGroupStatus()); + + regionGroupCache.cacheHeartbeatSample( 2, new RegionHeartbeatSample(currentTime, RegionStatus.Unknown)); - disabledRegionGroup1.updateCurrentStatistics(); + regionGroupCache.updateCurrentStatistics(); Assert.assertEquals( - RegionGroupStatus.Disabled, - disabledRegionGroup1.getCurrentStatistics().getRegionGroupStatus()); + RegionGroupStatus.Disabled, regionGroupCache.getCurrentStatistics().getRegionGroupStatus()); + } - RegionGroupCache disabledRegionGroup2 = - new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet())); - disabledRegionGroup2.cacheHeartbeatSample( + @Test + public void migrateRegionRegionGroupStatusTest() { + long currentTime = System.nanoTime(); + RegionGroupCache regionGroupCache = + new RegionGroupCache(DATABASE, Stream.of(0).collect(Collectors.toSet()), true); + regionGroupCache.cacheHeartbeatSample( 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); - disabledRegionGroup2.cacheHeartbeatSample( - 1, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); - disabledRegionGroup2.cacheHeartbeatSample( - 2, new RegionHeartbeatSample(currentTime, RegionStatus.Removing)); - disabledRegionGroup2.updateCurrentStatistics(); + regionGroupCache.updateCurrentStatistics(); Assert.assertEquals( - RegionGroupStatus.Available, - disabledRegionGroup2.getCurrentStatistics().getRegionGroupStatus()); + RegionGroupStatus.Running, regionGroupCache.getCurrentStatistics().getRegionGroupStatus()); + + regionGroupCache = + new RegionGroupCache(DATABASE, Stream.of(0, 1).collect(Collectors.toSet()), true); + regionGroupCache.cacheHeartbeatSample( + 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); + regionGroupCache.cacheHeartbeatSample( + 1, new RegionHeartbeatSample(currentTime, RegionStatus.Adding)); + regionGroupCache.updateCurrentStatistics(); + Assert.assertEquals( + RegionGroupStatus.Running, regionGroupCache.getCurrentStatistics().getRegionGroupStatus()); + + regionGroupCache = + new RegionGroupCache(DATABASE, Stream.of(0, 1).collect(Collectors.toSet()), true); + regionGroupCache.cacheHeartbeatSample( + 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); + regionGroupCache.cacheHeartbeatSample( + 1, new RegionHeartbeatSample(currentTime, RegionStatus.Removing)); + regionGroupCache.updateCurrentStatistics(); + Assert.assertEquals( + RegionGroupStatus.Running, regionGroupCache.getCurrentStatistics().getRegionGroupStatus()); } }