From 31ab613624cfe3cdb7a517d4753bb5a9f52a6b8c Mon Sep 17 00:00:00 2001 From: Ivan Bessonov Date: Tue, 10 Dec 2024 18:07:43 +0300 Subject: [PATCH] IGNITE-23413 Create RebalanceMinimumRequiredTimeProvider interface and its implementation (#4769) --- check-rules/spotbugs-excludes.xml | 10 + .../CatalogStorageProfileDescriptor.java | 18 + .../descriptors/CatalogZoneDescriptor.java | 15 + .../CatalogZoneDescriptorTest.java | 90 +++- .../RebalanceMinimumRequiredTimeProvider.java | 30 ++ ...alanceMinimumRequiredTimeProviderImpl.java | 295 +++++++++++ .../rebalance/RebalanceUtil.java | 7 +- .../BaseDistributionZoneManagerTest.java | 2 +- ...ceMinimumRequiredTimeProviderImplTest.java | 465 ++++++++++++++++++ 9 files changed, 918 insertions(+), 14 deletions(-) create mode 100644 modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProvider.java create mode 100644 modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImpl.java create mode 100644 modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImplTest.java diff --git a/check-rules/spotbugs-excludes.xml b/check-rules/spotbugs-excludes.xml index 018d2c904ab..6fd1b28dbc5 100644 --- a/check-rules/spotbugs-excludes.xml +++ b/check-rules/spotbugs-excludes.xml @@ -244,6 +244,16 @@ + + + + + + + + + + diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogStorageProfileDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogStorageProfileDescriptor.java index aa7103e509b..ef947f4056d 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogStorageProfileDescriptor.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogStorageProfileDescriptor.java @@ -54,6 +54,24 @@ public String storageProfile() { return storageProfile; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CatalogStorageProfileDescriptor that = (CatalogStorageProfileDescriptor) o; + return storageProfile.equals(that.storageProfile); + } + + @Override + public int hashCode() { + return storageProfile.hashCode(); + } + /** * Serializer for {@link CatalogStorageProfilesDescriptor}. */ diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java index b3b739cd451..e2c13b572b0 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java @@ -57,6 +57,21 @@ public class CatalogZoneDescriptor extends CatalogObjectDescriptor { */ private final ConsistencyMode consistencyMode; + /** + * Returns {@code true} if zone upgrade will lead to assignments recalculation. + */ + public static boolean updateRequiresAssignmentsRecalculation(CatalogZoneDescriptor oldDescriptor, CatalogZoneDescriptor newDescriptor) { + if (oldDescriptor.updateToken() == newDescriptor.updateToken()) { + return false; + } + + return oldDescriptor.partitions != newDescriptor.partitions + || oldDescriptor.replicas != newDescriptor.replicas + || !oldDescriptor.filter.equals(newDescriptor.filter) + || !oldDescriptor.storageProfiles.profiles().equals(newDescriptor.storageProfiles.profiles()) + || oldDescriptor.consistencyMode != newDescriptor.consistencyMode; + } + /** * Constructs a distribution zone descriptor. * diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptorTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptorTest.java index c185fdc1c70..ffd1ef854ef 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptorTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptorTest.java @@ -17,31 +17,28 @@ package org.apache.ignite.internal.catalog.descriptors; +import static java.util.Collections.emptyList; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParams; +import static org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor.updateRequiresAssignmentsRecalculation; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.List; +import java.util.function.Function; +import org.apache.ignite.internal.catalog.Catalog; +import org.apache.ignite.internal.catalog.commands.AlterZoneCommand; +import org.apache.ignite.internal.catalog.commands.AlterZoneCommandBuilder; import org.apache.ignite.internal.catalog.commands.StorageProfileParams; +import org.apache.ignite.internal.catalog.storage.AlterZoneEntry; import org.junit.jupiter.api.Test; class CatalogZoneDescriptorTest { @Test void toStringContainsTypeAndFields() { - var descriptor = new CatalogZoneDescriptor( - 1, - "zone1", - 2, - 3, - 4, - 5, - 6, - "the-filter", - fromParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build())), - ConsistencyMode.STRONG_CONSISTENCY - ); + var descriptor = createZoneDescriptor(); String toString = descriptor.toString(); @@ -57,4 +54,73 @@ void toStringContainsTypeAndFields() { assertThat(toString, containsString("storageProfiles=CatalogStorageProfilesDescriptor [")); assertThat(toString, containsString("consistencyMode=STRONG_CONSISTENCY")); } + + @Test + void testUpdateRequiresAssignmentsRecalculationAutoAdjust() { + doTestUpdateRequiresAssignmentsRecalculation(builder -> builder.dataNodesAutoAdjust(100), false); + } + + @Test + void testUpdateRequiresAssignmentsRecalculationAutoAdjustScaleUp() { + doTestUpdateRequiresAssignmentsRecalculation(builder -> builder.dataNodesAutoAdjustScaleUp(100), false); + } + + @Test + void testUpdateRequiresAssignmentsRecalculationAutoAdjustScaleDown() { + doTestUpdateRequiresAssignmentsRecalculation(builder -> builder.dataNodesAutoAdjustScaleDown(100), false); + } + + @Test + void testUpdateRequiresAssignmentsRecalculationFilter() { + doTestUpdateRequiresAssignmentsRecalculation(builder -> builder.filter("foo"), true); + } + + @Test + void testUpdateRequiresAssignmentsRecalculationReplicas() { + doTestUpdateRequiresAssignmentsRecalculation(builder -> builder.replicas(100500), true); + } + + @Test + void testUpdateRequiresAssignmentsRecalculationStorageProfiles() { + doTestUpdateRequiresAssignmentsRecalculation(builder -> builder.storageProfilesParams( + List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE + "2").build()) + ), true); + } + + private static void doTestUpdateRequiresAssignmentsRecalculation( + Function alter, + boolean expectedResult + ) { + var oldZone = createZoneDescriptor(); + + Catalog catalog = createCatalogWithSingleZone(oldZone); + + var alterZoneCommand = (AlterZoneCommand) alter.apply(AlterZoneCommand.builder().zoneName(oldZone.name())).build(); + + var alterZoneEntry = (AlterZoneEntry) alterZoneCommand.get(catalog).get(0); + alterZoneEntry.applyUpdate(catalog, oldZone.updateToken() + 1); + + CatalogZoneDescriptor newZone = alterZoneEntry.descriptor(); + + assertEquals(expectedResult, updateRequiresAssignmentsRecalculation(oldZone, newZone)); + } + + private static CatalogZoneDescriptor createZoneDescriptor() { + return new CatalogZoneDescriptor( + 1, + "zone1", + 2, + 3, + 4, + 5, + 6, + "the-filter", + fromParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build())), + ConsistencyMode.STRONG_CONSISTENCY + ); + } + + private static Catalog createCatalogWithSingleZone(CatalogZoneDescriptor zone) { + return new Catalog(1, 1, 1, List.of(zone), emptyList(), zone.id()); + } } diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProvider.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProvider.java new file mode 100644 index 00000000000..4719e44b030 --- /dev/null +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProvider.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.distributionzones.rebalance; + +/** + * Interface required for catalog compaction to determine a version for compaction. + */ +@SuppressWarnings("InterfaceMayBeAnnotatedFunctional") +public interface RebalanceMinimumRequiredTimeProvider { + /** + * Returns the minimum time required for rebalance, or current timestamp if there are no active rebalances and there is a guarantee that + * all rebalances launched in the future will use catalog version corresponding to the current time or greater. + */ + long minimumRequiredTime(); +} diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImpl.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImpl.java new file mode 100644 index 00000000000..ffeb840e7b5 --- /dev/null +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImpl.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.distributionzones.rebalance; + +import static java.lang.Math.min; +import static java.util.Collections.emptyMap; +import static java.util.stream.Collectors.toMap; +import static org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor.updateRequiresAssignmentsRecalculation; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX_BYTES; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_CHANGE_TRIGGER_PREFIX_BYTES; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import org.apache.ignite.internal.catalog.Catalog; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.partitiondistribution.Assignments; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.internal.util.Cursor; + +/** + * {@link RebalanceMinimumRequiredTimeProvider} implementation for the current implementation of assignments. Assumes that each table has + * its own assignments, but assignments within a zone are still somehow coordinated. + */ +public class RebalanceMinimumRequiredTimeProviderImpl implements RebalanceMinimumRequiredTimeProvider { + private final MetaStorageManager metaStorageManager; + private final CatalogService catalogService; + + /** + * Constructor. + */ + public RebalanceMinimumRequiredTimeProviderImpl(MetaStorageManager metaStorageManager, CatalogService catalogService) { + this.metaStorageManager = metaStorageManager; + this.catalogService = catalogService; + } + + @Override + public long minimumRequiredTime() { + // Use the same revision to read all the data, in order to guarantee consistency of data. + long appliedRevision = metaStorageManager.appliedRevision(); + + // Ignore the real safe time, having a time associated with revision is enough. Also, acquiring real safe time would be + // unnecessarily more complicated due to handling of possible data races. + long metaStorageSafeTime = metaStorageManager.timestampByRevisionLocally(appliedRevision).longValue(); + + long minTimestamp = metaStorageSafeTime; + + Map> stableAssignments = readAssignments(STABLE_ASSIGNMENTS_PREFIX_BYTES, appliedRevision); + Map> pendingAssignments = readAssignments(PENDING_ASSIGNMENTS_PREFIX_BYTES, appliedRevision); + + Map pendingChangeTriggerRevisions = readPendingChangeTriggerRevisions( + PENDING_CHANGE_TRIGGER_PREFIX_BYTES, + appliedRevision + ); + + int earliestCatalogVersion = catalogService.earliestCatalogVersion(); + int latestCatalogVersion = catalogService.latestCatalogVersion(); + + Map tableIdToZoneIdMap = tableIdToZoneIdMap(earliestCatalogVersion, latestCatalogVersion); + Map> allZonesByTimestamp = allZonesByTimestamp( + earliestCatalogVersion, + latestCatalogVersion + ); + Map> allZonesByRevision = allZonesByRevision(allZonesByTimestamp); + Map zoneDeletionTimestamps = zoneDeletionTimestamps(earliestCatalogVersion, latestCatalogVersion); + + for (Map.Entry entry : tableIdToZoneIdMap.entrySet()) { + Integer tableId = entry.getKey(); + Integer zoneId = entry.getValue(); + + NavigableMap zoneDescriptors = allZonesByTimestamp.get(zoneId); + int zonePartitions = zoneDescriptors.lastEntry().getValue().partitions(); + + Long pendingChangeTriggerRevision = pendingChangeTriggerRevisions.get(tableId); + + // +-1 here ir required for 2 reasons: + // - we need timestamp right before deletion, if zone is deleted, thus we must subtract 1; + // - we need a "metaStorageSafeTime" if zone is not deleted, without any subtractions. + long latestTimestamp = zoneDeletionTimestamps.getOrDefault(zoneId, metaStorageSafeTime + 1) - 1; + + long zoneRevision = pendingChangeTriggerRevision == null + ? zoneDescriptors.firstEntry().getValue().updateToken() + : pendingChangeTriggerRevision; + + NavigableMap map = allZonesByRevision.get(zoneId); + Map.Entry zone = map.floorEntry(zoneRevision); + long timestamp = metaStorageManager.timestampByRevisionLocally(zone.getValue().updateToken()).longValue(); + + timestamp = ceilTime(zoneDescriptors, timestamp, latestTimestamp); + + minTimestamp = min(minTimestamp, timestamp); + + // Having empty map instead of null simplifies the code that follows. + Map pendingTableAssignments = pendingAssignments.getOrDefault(tableId, emptyMap()); + + if (!pendingTableAssignments.isEmpty()) { + long pendingTimestamp = findProperTimestampForAssignments( + pendingTableAssignments.size() == zonePartitions + ? pendingTableAssignments + : stableAssignments.getOrDefault(tableId, emptyMap()), + zoneDescriptors, + latestTimestamp + ); + + minTimestamp = min(minTimestamp, pendingTimestamp); + } + } + + return minTimestamp; + } + + static Map> allZonesByRevision( + Map> allZones + ) { + return allZones.entrySet().stream().collect(toMap(Map.Entry::getKey, entry -> { + NavigableMap mapByRevision = new TreeMap<>(); + + for (CatalogZoneDescriptor zone : entry.getValue().values()) { + mapByRevision.put(zone.updateToken(), zone); + } + + return mapByRevision; + })); + } + + /** + * Detects all changes in zones configurations and arranges them in a convenient map. It maps {@code zoneId} into a sorted map, that + * contains a {@code alterTime -> zoneDescriptor} mapping. + */ + Map> allZonesByTimestamp( + int earliestCatalogVersion, + int latestCatalogVersion + ) { + Map> allZones = new HashMap<>(); + + for (int catalogVersion = earliestCatalogVersion; catalogVersion <= latestCatalogVersion; catalogVersion++) { + Catalog catalog = catalogService.catalog(catalogVersion); + + for (CatalogZoneDescriptor zone : catalog.zones()) { + NavigableMap map = allZones.computeIfAbsent(zone.id(), id -> new TreeMap<>()); + + if (map.isEmpty() || updateRequiresAssignmentsRecalculation(map.lastEntry().getValue(), zone)) { + map.put(catalog.time(), zone); + } + } + } + + return allZones; + } + + Map zoneDeletionTimestamps(int earliestCatalogVersion, int latestCatalogVersion) { + Set existingZoneIds = new HashSet<>(); + Map zoneDeletionTimestamps = new HashMap<>(); + + for (int catalogVersion = earliestCatalogVersion; catalogVersion <= latestCatalogVersion; catalogVersion++) { + Catalog catalog = catalogService.catalog(catalogVersion); + + for (Iterator iterator = existingZoneIds.iterator(); iterator.hasNext(); ) { + Integer zoneId = iterator.next(); + + if (catalog.zone(zoneId) == null) { + zoneDeletionTimestamps.put(zoneId, catalog.time()); + + iterator.remove(); + } + } + + for (CatalogZoneDescriptor zone : catalog.zones()) { + existingZoneIds.add(zone.id()); + } + } + + return zoneDeletionTimestamps; + } + + static long ceilTime(NavigableMap zoneDescriptors, long timestamp, long latestTimestamp) { + // We determine the "next" zone version, the one that comes after the version that corresponds to "timestamp". + // "ceilingKey" accepts an inclusive boundary, while we have an exclusive one. "+ 1" converts ">=" into ">". + Long ceilingKey = zoneDescriptors.ceilingKey(timestamp + 1); + + // While having it, we either decrement it to get "previous moment in time", or if there's no "next" version then we use "safeTime". + return ceilingKey == null ? latestTimestamp : ceilingKey - 1; + } + + /** + * Detects the specific zone version, associated with the most recent entry of the assignments set, and returns the highest possible + * timestamp that can be used to read that zone from the catalog. Returns {@code latestTimestamp} if that timestamp corresponds to + * {@code now}. + */ + static long findProperTimestampForAssignments( + Map assignments, + NavigableMap zoneDescriptors, + long latestTimestamp + ) { + long timestamp = assignments.values().stream() + .mapToLong(Assignments::timestamp) + // Use "max", assuming that older assignments are already processed in the past, they just had not been changed later. + .max() + // If assignments are empty, we shall use the earliest known timestamp for the zone. + .orElse(zoneDescriptors.firstEntry().getKey()); + + return ceilTime(zoneDescriptors, timestamp, latestTimestamp); + } + + /** + * Returns a {@code tableId -> zoneId} mapping for all tables that ever existed between two passed catalog versions, including the + * boundaries. + */ + private Map tableIdToZoneIdMap(int earliestCatalogVersion, int latestCatalogVersion) { + Map tableIdToZoneIdMap = new HashMap<>(); + for (int catalogVersion = earliestCatalogVersion; catalogVersion <= latestCatalogVersion; catalogVersion++) { + Catalog catalog = catalogService.catalog(catalogVersion); + + for (CatalogTableDescriptor table : catalog.tables()) { + tableIdToZoneIdMap.putIfAbsent(table.id(), table.zoneId()); + } + } + return tableIdToZoneIdMap; + } + + /** + * Reads assignments from the metastorage locally. The resulting map is a {@code tableId -> {partitionId -> assignments}} mapping. + */ + Map> readAssignments(byte[] prefix, long appliedRevision) { + Map> assignments = new HashMap<>(); + + try (Cursor entries = readLocallyByPrefix(prefix, appliedRevision)) { + for (Entry entry : entries) { + if (entry.empty() || entry.tombstone()) { + continue; + } + + TablePartitionId tablePartitionId = RebalanceUtil.extractTablePartitionId(entry.key(), prefix); + int tableId = tablePartitionId.tableId(); + int partitionId = tablePartitionId.partitionId(); + + assignments.computeIfAbsent(tableId, id -> new HashMap<>()) + .put(partitionId, Assignments.fromBytes(entry.value())); + } + } + + return assignments; + } + + Map readPendingChangeTriggerRevisions(byte[] prefix, long appliedRevision) { + Map revisions = new HashMap<>(); + + try (Cursor entries = readLocallyByPrefix(prefix, appliedRevision)) { + for (Entry entry : entries) { + if (entry.empty() || entry.tombstone()) { + continue; + } + + int tableId = RebalanceUtil.extractTablePartitionId(entry.key(), prefix).tableId(); + + byte[] value = entry.value(); + long revision = ByteUtils.bytesToLongKeepingOrder(value); + + revisions.compute(tableId, (k, prev) -> prev == null ? revision : min(prev, revision)); + } + } + + return revisions; + } + + private Cursor readLocallyByPrefix(byte[] prefix, long revision) { + return metaStorageManager.prefixLocally(new ByteArray(prefix), revision); + } +} diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java index d7b007a430a..8434986f4f1 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java @@ -401,6 +401,11 @@ private static CompletableFuture tablePartitionAssignment( /** Key prefix for switch append assignments. */ public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX = "assignments.switch.append."; + /** Key prefix for change trigger keys. */ + public static final String PENDING_CHANGE_TRIGGER_PREFIX = "pending.change.trigger."; + + public static final byte[] PENDING_CHANGE_TRIGGER_PREFIX_BYTES = PENDING_CHANGE_TRIGGER_PREFIX.getBytes(UTF_8); + /** * Key that is needed for skipping stale events of pending key change. * @@ -409,7 +414,7 @@ private static CompletableFuture tablePartitionAssignment( * @see Rebalance documentation */ public static ByteArray pendingChangeTriggerKey(TablePartitionId partId) { - return new ByteArray("pending.change.trigger." + partId); + return new ByteArray(PENDING_CHANGE_TRIGGER_PREFIX + partId); } /** diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java index c3261e9fe74..2b6fc7ed3b7 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java @@ -159,7 +159,7 @@ public void tearDown() throws Exception { closeAll(toCloseList); } - void startDistributionZoneManager() { + protected void startDistributionZoneManager() { assertThat( distributionZoneManager.startAsync(new ComponentContext()) .thenCompose(unused -> metaStorageManager.deployWatches()), diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImplTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImplTest.java new file mode 100644 index 00000000000..05884db3727 --- /dev/null +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImplTest.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.distributionzones.rebalance; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.ignite.catalog.annotations.Table; +import org.apache.ignite.internal.catalog.Catalog; +import org.apache.ignite.internal.catalog.commands.ColumnParams; +import org.apache.ignite.internal.catalog.commands.CreateTableCommand; +import org.apache.ignite.internal.catalog.commands.DropTableCommand; +import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.distributionzones.BaseDistributionZoneManagerTest; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.partitiondistribution.Assignment; +import org.apache.ignite.internal.partitiondistribution.Assignments; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.internal.util.CollectionUtils; +import org.apache.ignite.sql.ColumnType; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Test for {@link RebalanceMinimumRequiredTimeProviderImpl}. + */ +class RebalanceMinimumRequiredTimeProviderImplTest extends BaseDistributionZoneManagerTest { + private static final String TABLE_NAME = "tableName"; + + private static final String UPDATED_FILTER_1 = "$..*.*"; + private static final String UPDATED_FILTER_2 = "$..*.*.*"; + + private RebalanceMinimumRequiredTimeProvider minimumRequiredTimeProvider; + + @BeforeEach + void initProvider() { + minimumRequiredTimeProvider = new RebalanceMinimumRequiredTimeProviderImpl(metaStorageManager, catalogManager); + } + + private long getMinimumRequiredTime() { + long minimumRequiredTime = minimumRequiredTimeProvider.minimumRequiredTime(); + + assertThat(minimumRequiredTime, is(lessThanOrEqualTo(currentSafeTime()))); + + return minimumRequiredTime; + } + + /** + * "No tables" scenario means that no tables exist in zone. In this case, we should return safe time of latest meta-storage revision. + */ + @Test + void testNoTables() throws Exception { + startDistributionZoneManager(); + + // Wait for default zone update to be applied. + assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() > 0, 1000L)); + + long timeBefore = currentSafeTime(); + + long minimumRequiredTime = minimumRequiredTimeProvider.minimumRequiredTime(); + + long timeAfter = currentSafeTime(); + + // This check is fine, because there's no idle safe time propagation happening in these tests. Otherwise this check would fail + // randomly: provider's time corresponds to a revision, which might actually be earlier than the safe time we read. + assertThat(timeBefore, is(lessThanOrEqualTo(minimumRequiredTime))); + + assertThat(minimumRequiredTime, is(lessThanOrEqualTo(timeAfter))); + } + + /** + * Scenario where table exists, but assignments are not yet saved. In this case, we should return safe time that corresponds to earliest + * zone version. + */ + @Test + void testNoAssignments() throws Exception { + startDistributionZoneManager(); + + String defaultZoneName = getDefaultZone().name(); + + createTable(defaultZoneName); + + Catalog earliestCatalog = latestCatalogVersion(); + + alterZone(defaultZoneName, null, null, "$..*.*"); + Catalog latestCatalog = latestCatalogVersion(); + + long minimumRequiredTime = getMinimumRequiredTime(); + + assertThat(earliestCatalog.time(), is(lessThanOrEqualTo(minimumRequiredTime))); + assertThat(minimumRequiredTime, is(lessThan(latestCatalog.time()))); + } + + /** + * Scenario where table exists, its initial assignments are saved, and we updated the zone after that. In this case, we should return + * time, that corresponds to zone before its update. + */ + @Test + void testOldStableAssignments1() throws Exception { + startDistributionZoneManager(); + + String defaultZoneName = getDefaultZone().name(); + + int tableId = createTable(defaultZoneName); + + Catalog earliestCatalog = latestCatalogVersion(); + + alterZone(defaultZoneName, null, null, UPDATED_FILTER_1); + Catalog latestCatalog = latestCatalogVersion(); + + saveStableAssignments(defaultZoneName, tableId, earliestCatalog, true); + + long minimumRequiredTime = getMinimumRequiredTime(); + + assertThat(earliestCatalog.time(), is(lessThanOrEqualTo(minimumRequiredTime))); + assertThat(minimumRequiredTime, is(lessThan(latestCatalog.time()))); + } + + /** + * Scenario where table exists, its initial assignments are saved, and we updated the zone after that. In this case, we should return + * time, that corresponds to zone before its update. + */ + @Test + void testOldStableAssignments2() throws Exception { + startDistributionZoneManager(); + + String defaultZoneName = getDefaultZone().name(); + + int tableId = createTable(defaultZoneName); + + Catalog earliestCatalog = latestCatalogVersion(); + + alterZone(defaultZoneName, 1, 1, null); + Catalog intermediateCatalog = latestCatalogVersion(); + + alterZone(defaultZoneName, null, null, UPDATED_FILTER_1); + Catalog latestCatalog = latestCatalogVersion(); + + saveStableAssignments(defaultZoneName, tableId, earliestCatalog, true); + + long minimumRequiredTime = getMinimumRequiredTime(); + + assertThat(intermediateCatalog.time(), is(lessThanOrEqualTo(minimumRequiredTime))); + assertThat(minimumRequiredTime, is(lessThan(latestCatalog.time()))); + } + + /** + * Scenario where zone is already dropped, but there are still assignments. In this case we return latest available timestamp, in which + * zone is still present. + */ + @Test + void testStableAssignmentsDroppedZone() throws Exception { + startDistributionZoneManager(); + + String zoneName = "zoneName"; + createZone(zoneName, null, null, null); + + int tableId = createTable(zoneName); + + Catalog earliestCatalog = latestCatalogVersion(); + saveStableAssignments(zoneName, tableId, earliestCatalog, true); + + dropTable(TABLE_NAME); + dropZone(zoneName); + Catalog latestCatalog = latestCatalogVersion(); + + long minimumRequiredTime = getMinimumRequiredTime(); + + assertThat(earliestCatalog.time(), is(lessThanOrEqualTo(minimumRequiredTime))); + assertThat(minimumRequiredTime, is(lessThan(latestCatalog.time()))); + } + + /** + * Scenario where not all stable assignments are recalculated, and there are no pending assignments. Doesn't happen in real environment, + * test-only case. Shows that we use earliest zone version amongst known stable assignments. + */ + @Test + void testNewStableAssignments() throws Exception { + startDistributionZoneManager(); + + String defaultZoneName = getDefaultZone().name(); + + int tableId = createTable(defaultZoneName); + + Catalog earliestCatalog = latestCatalogVersion(); + + alterZone(defaultZoneName, null, null, UPDATED_FILTER_1); + Catalog latestCatalog = latestCatalogVersion(); + + saveStableAssignments(defaultZoneName, tableId, earliestCatalog, true); + saveStableAssignments(defaultZoneName, tableId, latestCatalog, false); + + long minimumRequiredTime = getMinimumRequiredTime(); + + assertThat(earliestCatalog.time(), is(lessThanOrEqualTo(minimumRequiredTime))); + assertThat(minimumRequiredTime, is(lessThan(latestCatalog.time()))); + } + + /** + * Scenario where pending assignments are only partially saved. In this case we're waiting for the rest of pending assignments to appear + * and should use the timestamp that corresponds to stable assignments. + */ + @Test + void testPendingAssignmentsNotAllPartitions() throws Exception { + startDistributionZoneManager(); + + String defaultZoneName = getDefaultZone().name(); + + int tableId = createTable(defaultZoneName); + + Catalog earliestCatalog = latestCatalogVersion(); + + alterZone(defaultZoneName, null, null, UPDATED_FILTER_1); + Catalog latestCatalog = latestCatalogVersion(); + + saveStableAssignments(defaultZoneName, tableId, earliestCatalog, true); + savePendingAssignments(defaultZoneName, tableId, latestCatalog, false); + + long minimumRequiredTime = getMinimumRequiredTime(); + + assertThat(earliestCatalog.time(), is(lessThanOrEqualTo(minimumRequiredTime))); + assertThat(minimumRequiredTime, is(lessThan(latestCatalog.time()))); + } + + /** + * Scenario where pending assignments are fully saved and we no longer need catalog version associated with stable assignments. + */ + @Test + void testPendingAssignmentsAllPartitions() throws Exception { + startDistributionZoneManager(); + + String defaultZoneName = getDefaultZone().name(); + + int tableId = createTable(defaultZoneName); + + Catalog earliestCatalog = latestCatalogVersion(); + + alterZone(defaultZoneName, null, null, UPDATED_FILTER_1); + Catalog latestCatalog = latestCatalogVersion(); + + saveStableAssignments(defaultZoneName, tableId, earliestCatalog, true); + savePendingAssignments(defaultZoneName, tableId, latestCatalog, true); + + long minimumRequiredTime = getMinimumRequiredTime(); + + assertThat(latestCatalog.time(), is(lessThanOrEqualTo(minimumRequiredTime))); + } + + /** + * Scenario where planned assignments are only partially saved. Pending assignments are not fully saved. This is a test-only case that + * shows that we will chose the timestamp that corresponds to stable assignments, because they are still not fully updated. + */ + @Test + void testPlannedAssignmentsNotAllPartitions() throws Exception { + startDistributionZoneManager(); + + String defaultZoneName = getDefaultZone().name(); + + int tableId = createTable(defaultZoneName); + + Catalog earliestCatalog = latestCatalogVersion(); + + alterZone(defaultZoneName, null, null, UPDATED_FILTER_1); + Catalog intermediateCatalog = latestCatalogVersion(); + + alterZone(defaultZoneName, null, null, UPDATED_FILTER_2); + Catalog latestCatalog = latestCatalogVersion(); + + saveStableAssignments(defaultZoneName, tableId, earliestCatalog, true); + savePendingAssignments(defaultZoneName, tableId, intermediateCatalog, false); + savePlannedAssignments(defaultZoneName, tableId, latestCatalog, false); + + long minimumRequiredTime = getMinimumRequiredTime(); + + assertThat(earliestCatalog.time(), is(lessThanOrEqualTo(minimumRequiredTime))); + assertThat(minimumRequiredTime, is(lessThan(intermediateCatalog.time()))); + } + + /** + * Scenario where planned assignments are fully saved. We should still use the timestamp associated with pending assignments, just in + * case we want to use it. Timestamp of pending assignments is considered to have a priority. + */ + @Test + void testPlannedAssignmentsAllPartitions() throws Exception { + startDistributionZoneManager(); + + String defaultZoneName = getDefaultZone().name(); + + int tableId = createTable(defaultZoneName); + + Catalog earliestCatalog = latestCatalogVersion(); + + alterZone(defaultZoneName, null, null, UPDATED_FILTER_1); + Catalog intermediateCatalog = latestCatalogVersion(); + + alterZone(defaultZoneName, null, null, UPDATED_FILTER_2); + Catalog latestCatalog = latestCatalogVersion(); + + saveStableAssignments(defaultZoneName, tableId, earliestCatalog, true); + savePendingAssignments(defaultZoneName, tableId, intermediateCatalog, true); + savePlannedAssignments(defaultZoneName, tableId, latestCatalog, true); + + long minimumRequiredTime = getMinimumRequiredTime(); + + assertThat(intermediateCatalog.time(), is(lessThanOrEqualTo(minimumRequiredTime))); + assertThat(minimumRequiredTime, is(lessThan(latestCatalog.time()))); + } + + private long currentSafeTime() { + return metaStorageManager.clusterTime().currentSafeTime().longValue(); + } + + private Catalog latestCatalogVersion() throws Exception { + Catalog latestCatalog = catalogManager.catalog(catalogManager.latestCatalogVersion()); + + assertThat(latestCatalog, is(notNullValue())); + + assertTrue(waitForCondition(() -> latestCatalog.time() <= currentSafeTime(), 10, 5000)); + + return latestCatalog; + } + + private int createTable(String defaultZoneName) throws Exception { + CompletableFuture tableFuture = catalogManager.execute(CreateTableCommand.builder() + .tableName(TABLE_NAME) + .schemaName(Table.DEFAULT_SCHEMA) + .zone(defaultZoneName) + .columns(List.of(ColumnParams.builder().name("key").type(ColumnType.INT32).build())) + .primaryKey(TableHashPrimaryKey.builder().columns(List.of("key")).build()) + .build() + ); + + assertThat(tableFuture, willCompleteSuccessfully()); + + int catalogVersion = tableFuture.get(); + + Catalog catalog = catalogManager.catalog(catalogVersion); + assertNotNull(catalog); + + CatalogTableDescriptor table = catalog.table(Table.DEFAULT_SCHEMA, TABLE_NAME); + assertNotNull(table); + + return table.id(); + } + + private void dropTable(String tableName) { + CompletableFuture future = catalogManager.execute(DropTableCommand.builder() + .tableName(tableName) + .schemaName(Table.DEFAULT_SCHEMA) + .build() + ); + + assertThat(future, willCompleteSuccessfully()); + } + + private void saveStableAssignments(String defaultZoneName, + int tableId, + Catalog catalog, + boolean allPartitions + ) throws Exception { + saveAssignments(true, defaultZoneName, tableId, catalog, RebalanceUtil::stablePartAssignmentsKey, allPartitions); + } + + private void savePendingAssignments(String zoneName, + int tableId, + Catalog catalog, + boolean allPartitions + ) throws Exception { + saveAssignments(false, zoneName, tableId, catalog, RebalanceUtil::pendingPartAssignmentsKey, allPartitions); + } + + private void savePlannedAssignments(String zoneName, + int tableId, + Catalog catalog, + boolean allPartitions + ) throws Exception { + saveAssignments(false, zoneName, tableId, catalog, RebalanceUtil::plannedPartAssignmentsKey, allPartitions); + } + + private void saveAssignments( + boolean stable, + String zoneName, + int tableId, + Catalog catalog, + Function keyFunction, + boolean allPartitions + ) throws Exception { + long timestamp = catalog.time(); + + CatalogZoneDescriptor catalogZoneDescriptor = catalog.zone(zoneName); + assertNotNull(catalogZoneDescriptor); + + long revision = catalogZoneDescriptor.updateToken(); + byte[] revisionBytes = ByteUtils.longToBytesKeepingOrder(revision); + + assertNotNull(catalogZoneDescriptor); + + Map metaStorageData = new HashMap<>(); + + for (int partitionId = 0; partitionId < catalogZoneDescriptor.partitions(); partitionId++) { + if (allPartitions || partitionId % 2 == 0) { + TablePartitionId tablePartitionId = new TablePartitionId(tableId, partitionId); + + if (!stable) { + metaStorageData.put( + RebalanceUtil.pendingChangeTriggerKey(tablePartitionId), + revisionBytes + ); + } + + metaStorageData.put( + keyFunction.apply(tablePartitionId), + Assignments.of(timestamp, Assignment.forPeer("nodeName")).toBytes() + ); + } + } + + CompletableFuture tableAssignmentsFuture = metaStorageManager.putAll(metaStorageData); + + assertThat(tableAssignmentsFuture, willCompleteSuccessfully()); + + assertTrue(waitForCondition(() -> { + long appliedRevision = metaStorageManager.appliedRevision(); + + @Nullable Entry first = CollectionUtils.first(metaStorageData.entrySet()); + + assertNotNull(first); + + return Arrays.equals(metaStorageManager.getLocally(first.getKey(), appliedRevision).value(), first.getValue()); + }, 10, 5000)); + } +}