diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/rollover/RolloverIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/rollover/RolloverIT.java index aaca77c426671..d4e07aa4251c3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/rollover/RolloverIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/rollover/RolloverIT.java @@ -200,7 +200,7 @@ public void testRolloverWithIndexSettings() throws Exception { final ClusterState state = client().admin().cluster().prepareState().get().getState(); final IndexMetadata oldIndex = state.metadata().index("test_index-2"); final IndexMetadata newIndex = state.metadata().index("test_index-000003"); - assertThat(newIndex.getNumberOfServingShards(), equalTo(1)); + assertThat(newIndex.getNumberOfShards(), equalTo(1)); assertThat(newIndex.getNumberOfReplicas(), equalTo(0)); assertTrue(newIndex.getAliases().containsKey("test_alias")); assertTrue(newIndex.getAliases().containsKey("extra_alias")); @@ -318,7 +318,7 @@ public void testRolloverWithIndexSettingsBalancedWithUseZoneForReplicaDefaultCou final ClusterState state = client().admin().cluster().prepareState().get().getState(); final IndexMetadata newIndex = state.metadata().index("test_index-000003"); - assertThat(newIndex.getNumberOfServingShards(), equalTo(3)); + assertThat(newIndex.getNumberOfShards(), equalTo(3)); assertThat(newIndex.getNumberOfReplicas(), equalTo(2)); manageReplicaSettingForDefaultReplica(false); randomIndexTemplate(); @@ -348,7 +348,7 @@ public void testRolloverWithIndexSettingsWithoutPrefix() throws Exception { final ClusterState state = client().admin().cluster().prepareState().get().getState(); final IndexMetadata oldIndex = state.metadata().index("test_index-2"); final IndexMetadata newIndex = state.metadata().index("test_index-000003"); - assertThat(newIndex.getNumberOfServingShards(), equalTo(1)); + assertThat(newIndex.getNumberOfShards(), equalTo(1)); assertThat(newIndex.getNumberOfReplicas(), equalTo(0)); assertTrue(newIndex.getAliases().containsKey("test_alias")); assertTrue(newIndex.getAliases().containsKey("extra_alias")); diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 5b03beb69a4e6..5ceb1b1f069e7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -180,7 +180,7 @@ private Map assertAndCapturePrimaryTerms(Map pre for (final IndexMetadata indexMetadata : state.metadata().indices().values()) { final String index = indexMetadata.getIndex().getName(); final long[] previous = previousTerms.get(index); - final long[] current = IntStream.range(0, indexMetadata.getNumberOfServingShards() + indexMetadata.getNumOfNonServingShards()) + final long[] current = IntStream.range(0, indexMetadata.getNumberOfShards()) .mapToLong(indexMetadata::primaryTerm).toArray(); if (previous == null) { result.put(index, current); diff --git a/server/src/internalClusterTest/java/org/opensearch/shardsplit/InPlaceShardSplitIT.java b/server/src/internalClusterTest/java/org/opensearch/shardsplit/InPlaceShardSplitIT.java index d582a3f79eac9..7627836c119b4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/shardsplit/InPlaceShardSplitIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/shardsplit/InPlaceShardSplitIT.java @@ -16,6 +16,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.ShardRange; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -27,9 +28,11 @@ import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.OpenSearchIntegTestCase; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.*; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; @@ -48,7 +51,8 @@ private Set triggerSplitAndGetChildShardIds(int parentShardId, int numb assertAcked(response); ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); IndexMetadata indexMetadata = clusterState.metadata().index("test"); - return new HashSet<>(indexMetadata.getChildShardIds(parentShardId)); + ShardRange[] shards = indexMetadata.getSplitShardsMetadata().getChildShardsOfParent(parentShardId); + return Arrays.stream(shards).map(ShardRange::getShardId).collect(Collectors.toSet()); } private void waitForSplit(int numberOfSplits, Set childShardIds, int parentShardId) throws Exception { @@ -89,10 +93,12 @@ private void assertClusterHealth() { private void verifyAfterSplit(long totalIndexedDocs, Set ids, int parentShardId, Set childShardIds) throws InterruptedException { ClusterState clusterState = internalCluster().clusterManagerClient().admin().cluster().prepareState().get().getState(); IndexMetadata indexMetadata = clusterState.metadata().index("test"); - assertTrue(indexMetadata.isParentShard(parentShardId)); - assertEquals(childShardIds, new HashSet<>(indexMetadata.getChildShardIds(parentShardId))); + assertNotNull(indexMetadata.getSplitShardsMetadata().getChildShardsOfParent(parentShardId)); + ShardRange[] shards = indexMetadata.getSplitShardsMetadata().getChildShardsOfParent(parentShardId); + Set currentShardIds = Arrays.stream(shards).map(ShardRange::getShardId).collect(Collectors.toSet()); + assertEquals(childShardIds, currentShardIds); Set newServingChildShardIds = new HashSet<>(); - for (int shardId : indexMetadata.getServingShardIds()) { + for (int shardId : currentShardIds) { assertTrue(parentShardId != shardId); if (childShardIds.contains(shardId)) newServingChildShardIds.add(shardId); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 071db5a5a01cc..7f6c039cf2ecc 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -429,7 +429,8 @@ private Map snapshotShards( IndexId indexId = repositoryData.resolveIndexId(index); IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId); if (indexMetadata != null) { - for (int i : indexMetadata.getServingShardIds()) { + int numberOfShards = indexMetadata.getNumberOfShards(); + for (int i = 0; i < numberOfShards; i++) { ShardId shardId = new ShardId(indexMetadata.getIndex(), i); SnapshotShardFailure shardFailure = findShardFailure(snapshotInfo.shardFailures(), shardId); if (shardFailure != null) { diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index dcf4f854498e9..23cd48ae09341 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -135,9 +135,7 @@ protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest re null, null, null, - -1L, - null, - null + -1L ) ); } @@ -230,9 +228,7 @@ public PitAwareShardRouting( RecoverySource recoverySource, UnassignedInfo unassignedInfo, AllocationId allocationId, - long expectedShardSize, - ShardId[] childShardIds, - ShardId splittingShardId + long expectedShardSize ) { super( shardId, @@ -244,8 +240,9 @@ public PitAwareShardRouting( unassignedInfo, allocationId, expectedShardSize, - childShardIds, - splittingShardId + null, + null, + null ); this.pitId = pitId; } diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 7e9ee6dc2e9f5..997d0f60192bf 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -879,7 +879,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index // split recovery where after all child shards are added to replication tracker, bulk // operations are replicated to all child primaries. int computedShardId = OperationRouting.generateShardId(indexMetadata, item.request().id(), - item.request().routing(), (shardId) -> true); + item.request().routing(), true); discardOperation = computedShardId != replica.shardId().id(); } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 1d433700d28a8..c46e57291bda3 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -49,6 +49,7 @@ import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ShardRange; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.GroupShardsIterator; @@ -1441,10 +1442,12 @@ static List getLocalLocalShardsIteratorFromPointInTime( final ShardId shardId = entry.getKey(); IndexMetadata indexMetadata = clusterState.metadata().getIndexSafe(shardId.getIndex()); final List allShardIds; - if (indexMetadata.isNonServingShard(shardId.id()) && indexMetadata.isParentShard(shardId.id())) { - List childShardIDs = indexMetadata.getSplitMetadata(shardId.id()).getChildShards(); + if (indexMetadata.getSplitShardsMetadata().isEmptyParentShard(shardId.id())) { + ShardRange[] childShards = indexMetadata.getSplitShardsMetadata().getChildShardsOfParent(shardId.id()); allShardIds = new ArrayList<>(); - childShardIDs.forEach(childShardId -> allShardIds.add(new ShardId(shardId.getIndex(), childShardId))); + for (ShardRange childShard : childShards) { + allShardIds.add(new ShardId(shardId.getIndex(), childShard.getShardId())); + } } else { allShardIds = List.of(shardId); } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 397bd2268c27f..4e28dbfb35acb 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -590,7 +590,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere primaryShardReference.close(); // release shard operation lock as soon as possible if (primaryShardReference.routingEntry().splitting()) { // This means shard was being split and was in relocation handoff stage when replication op on primary arrived. - // Write ops specifically will now get retried and will be routed to respective child shards. + // Write ops specifically will now get retried and will be routed to respective child shards by coordinator. throw new PrimaryShardSplitException("Primary shard is already split. Cannot perform replication operation on parent primary."); } @@ -1071,8 +1071,16 @@ protected void doRun() { : "request waitForActiveShards must be set in resolveRequest"; ShardRouting primary = null; - if (indexMetadata.isParentShard(request.shardId().id()) && indexMetadata.isNonServingShard(request.shardId.id())) { - throw new PrimaryShardSplitException("Primary shard is already split. Cannot perform replication operation on parent primary."); + if (indexMetadata.getSplitShardsMetadata().isEmptyParentShard(request.shardId().id())) { + if (state.version() < request.routedBasedOnClusterVersion()) { + // This will get retried on coordinator. Entire request will be re-driven on respective child shards. + // Since, we are throwing a custom exception, coordinator will re-drive it explicitly on child shards + // even if it is also stale and yet to receive update from cluster manager. + throw new PrimaryShardSplitException("Primary shard is already split. Cannot perform replication operation on parent primary."); + } else { + finishAsFailed(new IndexNotFoundException(request.shardId().getIndex())); + return; + } } else { IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(request.shardId().getIndex()); IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(request.shardId().id()); diff --git a/server/src/main/java/org/opensearch/cluster/ClusterState.java b/server/src/main/java/org/opensearch/cluster/ClusterState.java index f2fc8f2c66c5e..ab879d1641ada 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterState.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterState.java @@ -370,7 +370,7 @@ public String toString() { .append(indexMetadata.getAliasesVersion()) .append("]\n"); for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) { - if (indexMetadata.isServingShard(shard)) { + if (indexMetadata.getSplitShardsMetadata().isEmptyParentShard(shard) == false) { sb.append(TAB).append(TAB).append(shard).append(": "); sb.append("p_term [").append(indexMetadata.primaryTerm(shard)).append("], "); sb.append("isa_ids ").append(indexMetadata.inSyncAllocationIds(shard)).append("\n"); diff --git a/server/src/main/java/org/opensearch/cluster/DiffableUtils.java b/server/src/main/java/org/opensearch/cluster/DiffableUtils.java index 620f09fd9267c..a38fc81bebc08 100644 --- a/server/src/main/java/org/opensearch/cluster/DiffableUtils.java +++ b/server/src/main/java/org/opensearch/cluster/DiffableUtils.java @@ -566,29 +566,4 @@ public Set read(StreamInput in, K key) throws IOException { return Collections.unmodifiableSet(new HashSet<>(Arrays.asList(in.readStringArray()))); } } - - /** - * Implementation of ValueSerializer that serializes immutable sets - * - * @param type of map key - * - * @opensearch.internal - */ - public static class IntegerSetValueSerializer extends NonDiffableValueSerializer> { - private static final IntegerSetValueSerializer INSTANCE = new IntegerSetValueSerializer(); - - public static IntegerSetValueSerializer getInstance() { - return INSTANCE; - } - - @Override - public void write(Set value, StreamOutput out) throws IOException { - out.writeCollection(value, StreamOutput::writeVInt); - } - - @Override - public Set read(StreamInput in, K key) throws IOException { - return Collections.unmodifiableSet(in.readSet(StreamInput::readVInt)); - } - } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 8535887a20466..fa82faffb5674 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -636,9 +636,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { static final String KEY_ALIASES = "aliases"; static final String KEY_ROLLOVER_INFOS = "rollover_info"; static final String KEY_SYSTEM = "system"; - static final String KEY_SERVING_SHARD_IDS = "serving_shard_ids"; - static final String KEY_NUM_OF_NON_SERVING_SHARDS = "num_of_non_serving_shards"; - static final String KEY_PARENT_TO_CHILD_SHARD_METADATA = "parent_to_child_shards_metadata"; + static final String KEY_SPLIT_SHARDS_METADATA = "split_shards_metadata"; public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; @@ -647,7 +645,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { private final int routingFactor; private final int routingPartitionSize; - private final int numberOfSeedShards; + private final int numberOfShards; private final int numberOfReplicas; private final Index index; @@ -686,9 +684,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { private final ActiveShardCount waitForActiveShards; private final Map rolloverInfos; private final boolean isSystem; - private final Map parentToChildShardsMetadata; - private final int[] servingShardIds; - private final int numberOfNonServingShards; + private final SplitShardsMetadata splitShardsMetadata; private IndexMetadata( final Index index, @@ -698,7 +694,7 @@ private IndexMetadata( final long aliasesVersion, final long[] primaryTerms, final State state, - final int numberOfSeedShards, + final int numberOfShards, final int numberOfReplicas, final Settings settings, final Map mappings, @@ -716,9 +712,7 @@ private IndexMetadata( final ActiveShardCount waitForActiveShards, final Map rolloverInfos, final boolean isSystem, - final Map parentToChildShardsMetadata, - final int[] servingShardIds, - final int numberOfNonServingShards + final SplitShardsMetadata splitShardsMetadata ) { this.index = index; @@ -730,11 +724,11 @@ private IndexMetadata( assert aliasesVersion >= 0 : aliasesVersion; this.aliasesVersion = aliasesVersion; this.primaryTerms = primaryTerms; - assert primaryTerms.length == servingShardIds.length + numberOfNonServingShards; + assert primaryTerms.length == numberOfShards; this.state = state; - this.numberOfSeedShards = numberOfSeedShards; + this.numberOfShards = numberOfShards; this.numberOfReplicas = numberOfReplicas; - this.totalNumberOfShards = servingShardIds.length * (numberOfReplicas + 1); + this.totalNumberOfShards = numberOfShards * (numberOfReplicas + 1); this.settings = settings; this.mappings = Collections.unmodifiableMap(mappings); this.customData = Collections.unmodifiableMap(customData); @@ -747,15 +741,14 @@ private IndexMetadata( this.indexCreatedVersion = indexCreatedVersion; this.indexUpgradedVersion = indexUpgradedVersion; this.routingNumShards = routingNumShards; - this.routingFactor = routingNumShards / numberOfSeedShards; + this.routingFactor = routingNumShards / splitShardsMetadata.getNumberOfRootShards(); this.routingPartitionSize = routingPartitionSize; this.waitForActiveShards = waitForActiveShards; this.rolloverInfos = Collections.unmodifiableMap(rolloverInfos); this.isSystem = isSystem; - this.parentToChildShardsMetadata = Collections.unmodifiableMap(parentToChildShardsMetadata); - this.servingShardIds = servingShardIds; - this.numberOfNonServingShards = numberOfNonServingShards; - assert numberOfSeedShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfSeedShards; + this.splitShardsMetadata = splitShardsMetadata; + assert splitShardsMetadata.getNumberOfRootShards() * routingFactor == routingNumShards : routingNumShards + + " must be a multiple of " + splitShardsMetadata.getNumberOfRootShards(); } public Index getIndex() { @@ -830,23 +823,7 @@ public State getState() { } public int getNumberOfShards() { - return servingShardIds.length; - } - - public int getNumberOfSeedShards() { - return numberOfSeedShards; - } - - public int getNumberOfServingShards() { - return servingShardIds.length; - } - - public int getNumOfNonServingShards() { - return numberOfNonServingShards; - } - - public int[] getServingShardIds() { - return servingShardIds.clone(); + return numberOfShards; } public long[] getPrimaryTerms() { @@ -923,30 +900,11 @@ public Map getRolloverInfos() { return rolloverInfos; } - public boolean isParentShard(Integer shardId) { - return parentToChildShardsMetadata.containsKey(shardId); - } - - public boolean isNonServingShard(Integer shardId) { - return primaryTerms[shardId] == SPLIT_PARENT_TERM; - } - - public boolean isServingShard(Integer shardId) { - return primaryTerms[shardId] != SPLIT_PARENT_TERM; - } - - public SplitMetadata getSplitMetadata(Integer shardId) { - assert isParentShard(shardId); - return parentToChildShardsMetadata.get(shardId); - } - - public List getChildShardIds(int shardId) { - assert isParentShard(shardId); - return new ArrayList<>(parentToChildShardsMetadata.get(shardId).getChildShards()); + public SplitShardsMetadata getSplitShardsMetadata() { + return splitShardsMetadata; } public Set inSyncAllocationIds(int shardId) { - assert primaryTerms[shardId] != SPLIT_PARENT_TERM; return inSyncAllocationIds.get(shardId); } @@ -1021,13 +979,7 @@ public boolean equals(Object o) { if (isSystem != that.isSystem) { return false; } - if (!parentToChildShardsMetadata.equals(that.parentToChildShardsMetadata)) { - return false; - } - if (Arrays.equals(servingShardIds, that.servingShardIds) == false) { - return false; - } - if (numberOfNonServingShards != that.numberOfNonServingShards) { + if (!splitShardsMetadata.equals(that.splitShardsMetadata)) { return false; } return true; @@ -1048,9 +1000,7 @@ public int hashCode() { result = 31 * result + inSyncAllocationIds.hashCode(); result = 31 * result + rolloverInfos.hashCode(); result = 31 * result + Boolean.hashCode(isSystem); - result = 31 * result + parentToChildShardsMetadata.hashCode(); - result = 31 * result + Arrays.hashCode(servingShardIds); - result = 31 * result + Integer.hashCode(numberOfNonServingShards); + result = 31 * result + splitShardsMetadata.hashCode(); return result; } @@ -1095,9 +1045,7 @@ private static class IndexMetadataDiff implements Diff { private final Diff>> inSyncAllocationIds; private final Diff> rolloverInfos; private final boolean isSystem; - private Diff> parentToChildShardsMetadata; - private int[] servingShardIds; - private int numberOfNonServingShards; + private Diff splitMetadata; IndexMetadataDiff(IndexMetadata before, IndexMetadata after) { index = after.index.getName(); @@ -1120,18 +1068,7 @@ private static class IndexMetadataDiff implements Diff { ); rolloverInfos = DiffableUtils.diff(before.rolloverInfos, after.rolloverInfos, DiffableUtils.getStringKeySerializer()); isSystem = after.isSystem; - if (before.parentToChildShardsMetadata != null && after.parentToChildShardsMetadata != null) { - parentToChildShardsMetadata = DiffableUtils.diff( - before.parentToChildShardsMetadata, - after.parentToChildShardsMetadata, - DiffableUtils.getVIntKeySerializer() - ); - } - if (before.servingShardIds != null && after.servingShardIds != null) { - servingShardIds = after.servingShardIds; - numberOfNonServingShards = after.numberOfNonServingShards; - } - + splitMetadata = after.splitShardsMetadata.diff(before.splitShardsMetadata); } private static final DiffableUtils.DiffableValueReader ALIAS_METADATA_DIFF_VALUE_READER = @@ -1142,8 +1079,6 @@ private static class IndexMetadataDiff implements Diff { new DiffableUtils.DiffableValueReader<>(DiffableStringMap::readFrom, DiffableStringMap::readDiffFrom); private static final DiffableUtils.DiffableValueReader ROLLOVER_INFO_DIFF_VALUE_READER = new DiffableUtils.DiffableValueReader<>(RolloverInfo::new, RolloverInfo::readDiffFrom); - private static final DiffableUtils.DiffableValueReader SPLIT_METADATA_DIFF_VALUE_READER = - new DiffableUtils.DiffableValueReader<>(SplitMetadata::new, SplitMetadata::readDiffFrom); IndexMetadataDiff(StreamInput in) throws IOException { index = in.readString(); @@ -1171,13 +1106,7 @@ private static class IndexMetadataDiff implements Diff { rolloverInfos = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), ROLLOVER_INFO_DIFF_VALUE_READER); isSystem = in.readBoolean(); if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - parentToChildShardsMetadata = DiffableUtils.readJdkMapDiff( - in, - DiffableUtils.getVIntKeySerializer(), - SPLIT_METADATA_DIFF_VALUE_READER - ); - servingShardIds = in.readVIntArray(); - numberOfNonServingShards = in.readVInt(); + splitMetadata = SplitShardsMetadata.readDiffFrom(in); } } @@ -1203,9 +1132,7 @@ public void writeTo(StreamOutput out) throws IOException { rolloverInfos.writeTo(out); out.writeBoolean(isSystem); if (out.getVersion().onOrAfter(Version.V_3_0_0)) { - parentToChildShardsMetadata.writeTo(out); - out.writeVIntArray(servingShardIds); - out.writeVInt(numberOfNonServingShards); + splitMetadata.writeTo(out); } } @@ -1226,13 +1153,7 @@ public IndexMetadata apply(IndexMetadata part) { builder.inSyncAllocationIds.putAll(inSyncAllocationIds.apply(part.inSyncAllocationIds)); builder.rolloverInfos.putAll(rolloverInfos.apply(part.rolloverInfos)); builder.system(part.isSystem); - if (parentToChildShardsMetadata != null) { - builder.parentToChildShardsMetadata.putAll(parentToChildShardsMetadata.apply(part.parentToChildShardsMetadata)); - } - if (servingShardIds != null) { - builder.servingShardIds(servingShardIds); - builder.numberOfNonServingShards(numberOfNonServingShards); - } + builder.splitShardsMetadata(splitMetadata.apply(part.splitShardsMetadata)); return builder.build(); } } @@ -1279,12 +1200,7 @@ public static IndexMetadata readFrom(StreamInput in) throws IOException { } builder.system(in.readBoolean()); if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - int parentShardsMetadataSize = in.readVInt(); - for(int i=0; i < parentShardsMetadataSize; i++) { - builder.putParentToChildShardMetadata(new SplitMetadata(in)); - } - builder.servingShardIds(in.readVIntArray()); - builder.numberOfNonServingShards(in.readVInt()); + builder.splitShardsMetadata(new SplitShardsMetadata(in)); } return builder.build(); } @@ -1328,14 +1244,7 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeBoolean(isSystem); if (out.getVersion().onOrAfter(Version.V_3_0_0)) { - out.writeVInt(parentToChildShardsMetadata.size()); - for(final SplitMetadata splitMetadata : parentToChildShardsMetadata.values()) { - splitMetadata.writeTo(out); - } - out.writeVIntArray(servingShardIds); - out.writeVInt(numberOfNonServingShards); - } else if (parentToChildShardsMetadata.isEmpty() == false) { - throw new IllegalStateException("In-place split not allowed on older versions."); + splitShardsMetadata.writeTo(out); } } @@ -1374,9 +1283,7 @@ public static class Builder { private final Map rolloverInfos; private Integer routingNumShards; private boolean isSystem; - private final Map parentToChildShardsMetadata; - private int[] servingShardIds; - private int numberOfNonServingShards; + private SplitShardsMetadata splitShardsMetadata; public Builder(String index) { this.index = index; @@ -1385,7 +1292,6 @@ public Builder(String index) { this.customMetadata = new HashMap<>(); this.inSyncAllocationIds = new HashMap<>(); this.rolloverInfos = new HashMap<>(); - this.parentToChildShardsMetadata = new HashMap<>(); this.isSystem = false; } @@ -1405,9 +1311,7 @@ public Builder(IndexMetadata indexMetadata) { this.inSyncAllocationIds = new HashMap<>(indexMetadata.inSyncAllocationIds); this.rolloverInfos = new HashMap<>(indexMetadata.rolloverInfos); this.isSystem = indexMetadata.isSystem; - this.parentToChildShardsMetadata = new HashMap<>(indexMetadata.parentToChildShardsMetadata); - this.servingShardIds = indexMetadata.servingShardIds; - this.numberOfNonServingShards = indexMetadata.numberOfNonServingShards; + this.splitShardsMetadata = new SplitShardsMetadata.Builder(indexMetadata.splitShardsMetadata).build(); } public Builder index(String index) { @@ -1449,86 +1353,32 @@ public int numberOfShards() { return settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1); } - private Builder servingShardIds(int[] servingShardIds) { - this.servingShardIds = servingShardIds; - return this; - } - - private Builder numberOfNonServingShards(int numberOfNonServingShards) { - this.numberOfNonServingShards = numberOfNonServingShards; - return this; - } - public Builder updateMetadataForNewChildShards(Map newAllocationIds, int sourceShardId) { - if (parentToChildShardsMetadata.get(sourceShardId) == null) { - // parentToChildShardsMetadata will always have this parent at this point but if we somehow reached here - // due to a missed fail shard event and shard split went through, then we will need to re-populate this - // as now shard is already split. - addChildShardsForSplittingShard(sourceShardId, new ArrayList<>(newAllocationIds.keySet())); - } - int numberOfNewShards = newAllocationIds.size(); - int numberOfServingShards = this.servingShardIds.length + numberOfNewShards - 1; - int[] newServingShards = new int[numberOfServingShards]; - int newIdx = 0; - // Remove source shard from serving shard ids. - for (int servingShardId : this.servingShardIds) { - if (servingShardId != sourceShardId) { - newServingShards[newIdx++] = servingShardId; - } - } - // Add new serving shard ids. - for (int newShardId : newAllocationIds.keySet()) { - assert inSyncAllocationIds.containsKey(newShardId) == false; - newServingShards[newIdx++] = newShardId; - } - this.servingShardIds = newServingShards; // Now update primary terms against child shard ids - int numOfNewPrimaryTerms = this.primaryTerms.length + newAllocationIds.size(); - long []newPrimaryTerms = Arrays.copyOf(this.primaryTerms, numOfNewPrimaryTerms); + int numOfFinalPrimaryTerms = this.primaryTerms.length + newAllocationIds.size(); + long []finalPrimaryTerms = Arrays.copyOf(this.primaryTerms, numOfFinalPrimaryTerms); long parentPrimaryTerm = this.primaryTerms[sourceShardId]; - Arrays.fill(newPrimaryTerms, this.primaryTerms.length, numOfNewPrimaryTerms, parentPrimaryTerm); - this.primaryTerms = newPrimaryTerms; - - // Set parent primary term as split - primaryTerm(sourceShardId, IndexMetadata.SPLIT_PARENT_TERM); - // Increment number of non serving shards - this.numberOfNonServingShards++; + Arrays.fill(finalPrimaryTerms, this.primaryTerms.length, numOfFinalPrimaryTerms, parentPrimaryTerm); + this.primaryTerms = finalPrimaryTerms; // Add in-sync allocations of child shards newAllocationIds.forEach((shardId, newAllocationId) -> this.inSyncAllocationIds.put( shardId, Sets.newHashSet(newAllocationId))); - // Remove in-sync allocations of parent - this.inSyncAllocationIds.remove(sourceShardId); - - return this; - } + SplitShardsMetadata.Builder splitShardsMetadata = new SplitShardsMetadata.Builder(this.splitShardsMetadata); + splitShardsMetadata.updateSplitMetadataForChildShards(sourceShardId, newAllocationIds.keySet()); - public Builder addChildShardsForSplittingShard(int sourceShardId, List childShardIds) { - Integer parentOfSource = null; - for (Integer parent : parentToChildShardsMetadata.keySet()) { - SplitMetadata splitMetadata = parentToChildShardsMetadata.get(parent); - for (Integer childShard : splitMetadata.getChildShards()) { - if (sourceShardId == childShard) { - parentOfSource = parent; - break; - } - } - if (parentOfSource != null) { - break; - } - } - - int parentRoutingFactor = parentOfSource == null ? routingNumShards / INDEX_NUMBER_OF_SHARDS_SETTING.get(settings): - parentToChildShardsMetadata.get(parentOfSource).getRoutingFactor(); + this.splitShardsMetadata = splitShardsMetadata.build(); + numberOfShards(this.splitShardsMetadata.getNumberOfShards()); + this.settingsVersion += 1; - SplitMetadata splitMetadata = new SplitMetadata(sourceShardId, childShardIds, parentRoutingFactor); - putParentToChildShardMetadata(splitMetadata); return this; } - public Builder removeParentToChildShardMetadata(Integer parentShardId) { - parentToChildShardsMetadata.remove(parentShardId); + public Builder cancelSplit(int sourceShardId) { + SplitShardsMetadata.Builder splitShardsMetadata = new SplitShardsMetadata.Builder(this.splitShardsMetadata); + splitShardsMetadata.cancelSplit(sourceShardId); + this.splitShardsMetadata = splitShardsMetadata.build(); return this; } @@ -1626,8 +1476,8 @@ public Builder putRolloverInfo(RolloverInfo rolloverInfo) { return this; } - public Builder putParentToChildShardMetadata(SplitMetadata splitMetadata) { - parentToChildShardsMetadata.put(splitMetadata.getParentShardId(), splitMetadata); + public Builder splitShardsMetadata(SplitShardsMetadata splitShardsMetadata) { + this.splitShardsMetadata = splitShardsMetadata; return this; } @@ -1723,13 +1573,10 @@ public IndexMetadata build() { if (INDEX_NUMBER_OF_SHARDS_SETTING.exists(settings) == false) { throw new IllegalArgumentException("must specify number of shards for index [" + index + "]"); } - final int numberOfSeedShards = INDEX_NUMBER_OF_SHARDS_SETTING.get(settings); - if (servingShardIds == null) { - servingShardIds = new int[numberOfSeedShards]; - for (int i=0; i < numberOfSeedShards; i++) { - servingShardIds[i] = i; - } + if (splitShardsMetadata == null) { + this.splitShardsMetadata = new SplitShardsMetadata.Builder(numberOfShards()).build(); } + final int numberOfShards = splitShardsMetadata.getNumberOfShards(); if (INDEX_NUMBER_OF_REPLICAS_SETTING.exists(settings) == false) { throw new IllegalArgumentException("must specify number of replicas for index [" + index + "]"); @@ -1752,7 +1599,7 @@ public IndexMetadata build() { // fill missing slots in inSyncAllocationIds with empty set if needed and make all entries immutable final Map> filledInSyncAllocationIds = new HashMap<>(); - for (int i : servingShardIds) { + for (int i = 0; i < numberOfShards; i++) { if (inSyncAllocationIds.containsKey(i)) { filledInSyncAllocationIds.put(i, Collections.unmodifiableSet(new HashSet<>(inSyncAllocationIds.get(i)))); } else { @@ -1792,12 +1639,12 @@ public IndexMetadata build() { if (primaryTerms == null) { initializePrimaryTerms(); - } else if (primaryTerms.length != servingShardIds.length + numberOfNonServingShards) { + } else if (primaryTerms.length != numberOfShards) { throw new IllegalStateException( "primaryTerms length is [" + primaryTerms.length + "] but should be equal to total of serving shards and empty shards [" - + (servingShardIds.length + numberOfNonServingShards) + + numberOfShards() + "]" ); } @@ -1826,7 +1673,7 @@ public IndexMetadata build() { aliasesVersion, primaryTerms, state, - numberOfSeedShards, + numberOfShards, numberOfReplicas, tmpSettings, mappings, @@ -1844,9 +1691,7 @@ public IndexMetadata build() { waitForActiveShards, rolloverInfos, isSystem, - parentToChildShardsMetadata, - servingShardIds, - numberOfNonServingShards + splitShardsMetadata ); } @@ -1914,7 +1759,7 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build builder.endObject(); builder.startArray(KEY_PRIMARY_TERMS); - for (int i = 0; i < indexMetadata.servingShardIds.length + indexMetadata.numberOfNonServingShards; i++) { + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { builder.value(indexMetadata.primaryTerm(i)); } builder.endArray(); @@ -1948,17 +1793,9 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build } builder.endObject(); - builder.startObject(KEY_PARENT_TO_CHILD_SHARD_METADATA); - for (final SplitMetadata cursor : indexMetadata.parentToChildShardsMetadata.values()) { - cursor.toXContent(builder); - } + builder.startObject(KEY_SPLIT_SHARDS_METADATA); + indexMetadata.splitShardsMetadata.toXContent(builder, params); builder.endObject(); - builder.startArray(KEY_SERVING_SHARD_IDS); - for (final int cursor : indexMetadata.servingShardIds) { - builder.value(cursor); - } - builder.endArray(); - builder.field(KEY_NUM_OF_NON_SERVING_SHARDS, indexMetadata.numberOfNonServingShards); builder.field(KEY_SYSTEM, indexMetadata.isSystem); @@ -2043,16 +1880,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti // simply ignored when upgrading from 2.x assert Version.CURRENT.major <= 5; parser.skipChildren(); - } else if (KEY_PARENT_TO_CHILD_SHARD_METADATA.equals(currentFieldName)) { - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - builder.putParentToChildShardMetadata(SplitMetadata.parse(parser, currentFieldName)); - } else { - throw new IllegalArgumentException("Unexpected token: " + token); - } - } + } else if (KEY_SPLIT_SHARDS_METADATA.equals(currentFieldName)) { + builder.splitShardsMetadata(SplitShardsMetadata.parse(parser)); } else { // assume it's custom index metadata builder.putCustom(currentFieldName, parser.mapStrings()); @@ -2080,16 +1909,6 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti } } builder.primaryTerms(list.stream().mapToLong(i -> i).toArray()); - } else if (KEY_SERVING_SHARD_IDS.equals(currentFieldName)) { - final List list = new ArrayList<>(); - while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { - if (token == XContentParser.Token.VALUE_NUMBER) { - list.add(parser.intValue()); - } else { - throw new IllegalStateException("found a non-numeric value under [" + KEY_SERVING_SHARD_IDS + "]"); - } - } - builder.servingShardIds(list.stream().mapToInt(i -> i).toArray()); } else { throw new IllegalArgumentException("Unexpected field for an array " + currentFieldName); } @@ -2111,8 +1930,6 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti builder.setRoutingNumShards(parser.intValue()); } else if (KEY_SYSTEM.equals(currentFieldName)) { builder.system(parser.booleanValue()); - } else if (KEY_NUM_OF_NON_SERVING_SHARDS.equals(currentFieldName)) { - builder.numberOfNonServingShards(parser.intValue()); } else { throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } @@ -2205,7 +2022,7 @@ public IndexMetadata fromXContent(XContentParser parser) throws IOException { /** * Returns the number of shards that should be used for routing. This basically defines the hash space we use in * {@link org.opensearch.cluster.routing.OperationRouting#generateShardId(IndexMetadata, String, String)} to route documents - * to shards based on their ID or their specific routing value. The default value is {@link #getNumberOfSeedShards()}. This value only + * to shards based on their ID or their specific routing value. The default value is {@link #getNumberOfShards()} ()}. This value only * changes if and index is shrunk. */ public int getRoutingNumShards() { @@ -2229,7 +2046,12 @@ public int getRoutingFactor() { * @return a the source shard ID to split off from */ public static ShardId selectSplitShard(int shardId, IndexMetadata sourceIndexMetadata, int numTargetShards) { - int numSourceShards = sourceIndexMetadata.getNumberOfSeedShards(); + if (sourceIndexMetadata.getNumberOfShards() > sourceIndexMetadata.getSplitShardsMetadata().getNumberOfRootShards()) { + throw new IllegalArgumentException( + "Cannot perform an index level split as one or more shards of this index has been split. " + ); + } + int numSourceShards = sourceIndexMetadata.getNumberOfShards(); if (shardId >= numTargetShards) { throw new IllegalArgumentException( "the number of target shards (" + numTargetShards + ") must be greater than the shard id: " + shardId @@ -2294,9 +2116,9 @@ private static void assertSplitMetadata(int numSourceShards, int numTargetShards * @param numTargetShards the number of target shards */ public static Set selectRecoverFromShards(int shardId, IndexMetadata sourceIndexMetadata, int numTargetShards) { - if (sourceIndexMetadata.getNumberOfSeedShards() > numTargetShards) { + if (sourceIndexMetadata.getNumberOfShards() > numTargetShards) { return selectShrinkShards(shardId, sourceIndexMetadata, numTargetShards); - } else if (sourceIndexMetadata.getNumberOfSeedShards() < numTargetShards) { + } else if (sourceIndexMetadata.getNumberOfShards() < numTargetShards) { return Collections.singleton(selectSplitShard(shardId, sourceIndexMetadata, numTargetShards)); } else { return Collections.singleton(selectCloneShard(shardId, sourceIndexMetadata, numTargetShards)); @@ -2311,21 +2133,26 @@ public static Set selectRecoverFromShards(int shardId, IndexMetadata so * @return a set of shard IDs to shrink into the given shard ID. */ public static Set selectShrinkShards(int shardId, IndexMetadata sourceIndexMetadata, int numTargetShards) { + if (sourceIndexMetadata.getNumberOfShards() > sourceIndexMetadata.getSplitShardsMetadata().getNumberOfRootShards()) { + throw new IllegalArgumentException( + "Cannot perform an index level shrink as one or more shards of this index has been split. " + ); + } if (shardId >= numTargetShards) { throw new IllegalArgumentException( "the number of target shards (" + numTargetShards + ") must be greater than the shard id: " + shardId ); } - if (sourceIndexMetadata.getNumberOfSeedShards() < numTargetShards) { + if (sourceIndexMetadata.getNumberOfShards() < numTargetShards) { throw new IllegalArgumentException( "the number of target shards [" + numTargetShards + "] must be less that the number of source shards [" - + sourceIndexMetadata.getNumberOfSeedShards() + + sourceIndexMetadata.getNumberOfShards() + "]" ); } - int routingFactor = getRoutingFactor(sourceIndexMetadata.getNumberOfSeedShards(), numTargetShards); + int routingFactor = getRoutingFactor(sourceIndexMetadata.getNumberOfShards(), numTargetShards); Set shards = new HashSet<>(routingFactor); for (int i = shardId * routingFactor; i < routingFactor * shardId + routingFactor; i++) { shards.add(new ShardId(sourceIndexMetadata.getIndex(), i)); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataInPlaceShardSplitService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataInPlaceShardSplitService.java index d4e69c3ae65cd..ed1636721e13b 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataInPlaceShardSplitService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataInPlaceShardSplitService.java @@ -131,14 +131,13 @@ public ClusterState applyShardSplitRequest( ) { IndexMetadata curIndexMetadata = currentState.metadata().index(request.getIndex()); ShardId sourceShardId = new ShardId(curIndexMetadata.getIndex(), request.getShardId()); - if (curIndexMetadata.isParentShard(sourceShardId.id())) { - try { - currentState.getRoutingTable().shardRoutingTable(request.getIndex(), request.getShardId()); - throw new IllegalArgumentException("Splitting of this shard is already in progress"); - } catch (ShardNotFoundException ex) { - // Shard is already split. - throw new IllegalArgumentException("Shard is already split."); - } + if (curIndexMetadata.getSplitShardsMetadata().getInProgressSplitShardId() != SplitShardsMetadata.SPLIT_NOT_IN_PROGRESS) { + int inProgressSplitShard = curIndexMetadata.getSplitShardsMetadata().getInProgressSplitShardId(); + throw new IllegalArgumentException("Splitting of shard [" + inProgressSplitShard + "] is already in progress"); + } + + if (curIndexMetadata.getSplitShardsMetadata().isEmptyParentShard(request.getShardId())) { + throw new IllegalArgumentException("Shard [" + request.getShardId() + "] has already been split."); } Tuple shardSplitSupportedOnPlugins = pluginsService.isShardSplitAllowed(sourceShardId.getIndex()); @@ -150,14 +149,11 @@ public ClusterState applyShardSplitRequest( RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(curIndexMetadata); - List childShardIds = new ArrayList<>(); - int maxUsedShardId = curIndexMetadata.getNumberOfServingShards() + curIndexMetadata.getNumOfNonServingShards() - 1; - for (int i = 1; i <= request.getSplitInto(); i++) { - childShardIds.add(maxUsedShardId + i); - } + SplitShardsMetadata.Builder splitMetadataBuilder = new SplitShardsMetadata.Builder(curIndexMetadata.getSplitShardsMetadata()); + splitMetadataBuilder.splitShard(sourceShardId.id(), request.getSplitInto()); + indexMetadataBuilder.splitShardsMetadata(splitMetadataBuilder.build()); - indexMetadataBuilder.addChildShardsForSplittingShard(sourceShardId.id(), childShardIds); RoutingTable routingTable = routingTableBuilder.build(); metadataBuilder.put(indexMetadataBuilder); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/ShardRange.java b/server/src/main/java/org/opensearch/cluster/metadata/ShardRange.java new file mode 100644 index 0000000000000..9da2cbe6959a1 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/metadata/ShardRange.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; + +public class ShardRange implements Comparable, ToXContentFragment, Writeable { + private final int shardId; + private final int start; + private final int end; + + public ShardRange(int shardId, int start, int end) { + this.shardId = shardId; + this.start = start; + this.end = end; + } + + /** + * Constructs a new shard range from a stream. + * @param in the stream to read from + * @throws IOException if an error occurs while reading from the stream + * @see #writeTo(StreamOutput) + */ + public ShardRange(StreamInput in) throws IOException { + shardId = in.readVInt(); + start = in.readInt(); + end = in.readInt(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ShardRange)) return false; + + ShardRange that = (ShardRange) o; + + if (shardId != that.shardId) return false; + if (start != that.start) return false; + return end == that.end; + } + + @Override + public int hashCode() { + int result = shardId; + result = 31 * result + start; + result = 31 * result + end; + return result; + } + + public int getShardId() { + return shardId; + } + + public int getStart() { + return start; + } + + public int getEnd() { + return end; + } + + public ShardRange copy() { + return new ShardRange(shardId, start, end); + } + + public boolean contains(long hash) { + return hash >= start && hash <= end; + } + + @Override + public int compareTo(ShardRange o) { + return Integer.compare(start, o.start); + } + + @Override + public String toString() { + return "ShardRange{" + + "shardId=" + shardId + + ", start=" + start + + ", end=" + end + + '}'; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(shardId); + out.writeInt(start); + out.writeInt(end); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject() + .field("shard_id", shardId) + .field("start", start) + .field("end", end); + builder.endObject(); + return builder; + } + + public static ShardRange parse(XContentParser parser) throws IOException { + int shardId = -1, start = -1, end = -1; + XContentParser.Token token; + String fieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("shard_id".equals(fieldName)) { + shardId = parser.intValue(); + } else if ("start".equals(fieldName)) { + start = parser.intValue(); + } else if ("end".equals(fieldName)) { + end = parser.intValue(); + } + } + } + + return new ShardRange(shardId, start, end); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/metadata/SplitMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/SplitMetadata.java deleted file mode 100644 index 3438a8a8d07d0..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/metadata/SplitMetadata.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.metadata; - -import org.opensearch.cluster.AbstractDiffable; -import org.opensearch.cluster.Diff; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - - -public class SplitMetadata extends AbstractDiffable { - private final int parentShardId; - private final List childShardIds; - - private final int routingFactor; - private final int routingNumShards; - - public SplitMetadata(int parentShardId, List childShardIds, int parentRoutingFactor) { - this.parentShardId = parentShardId; - Collections.sort(childShardIds); - this.childShardIds = Collections.unmodifiableList(childShardIds); - int numChildShards = childShardIds.size(); - this.routingNumShards = calculateNumRoutingShards(parentRoutingFactor, numChildShards); - this.routingFactor = this.routingNumShards / numChildShards; - } - - public SplitMetadata(int parentShardId, List childShardIds, int routingFactor, int routingNumShards) { - this.parentShardId = parentShardId; - Collections.sort(childShardIds); - this.childShardIds = Collections.unmodifiableList(childShardIds); - this.routingFactor = routingFactor; - this.routingNumShards = routingNumShards; - } - - /** - * Calculate the number of routing shards for a given parent shard - * @param parentRoutingFactor routing factor of parent shard - * @param numOfChildShards number of child shards - * @return the number of routing shards - */ - public static int calculateNumRoutingShards(int parentRoutingFactor, int numOfChildShards) { - if(numOfChildShards > parentRoutingFactor) { - throw new IllegalArgumentException("Cannot split further"); - } - int x = parentRoutingFactor / numOfChildShards; - int log2OrfDivShards = 32 - Integer.numberOfLeadingZeros(x - 1); - - int numSplits = ((x & (x - 1)) == 0)? log2OrfDivShards: log2OrfDivShards -1; - return numOfChildShards * (1 << numSplits); - } - - public int getRoutingFactor() { - return routingFactor; - } - - public int getRoutingNumShards() { - return routingNumShards; - } - - public List getChildShards() { - return childShardIds; - } - - public Integer getChildShardIdAtIndex(int index) { - return childShardIds.get(index); - } - - public int getParentShardId() { - return parentShardId; - } - - public SplitMetadata(StreamInput in) throws IOException { - parentShardId = in.readVInt(); - childShardIds = in.readList(StreamInput::readVInt); - routingFactor = in.readVInt(); - routingNumShards = in.readVInt(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(parentShardId); - out.writeCollection(childShardIds, StreamOutput::writeVInt); - out.writeVInt(routingFactor); - out.writeVInt(routingNumShards); - } - - public static Diff readDiffFrom(StreamInput in) throws IOException { - return readDiffFrom(SplitMetadata::new, in); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof SplitMetadata)) return false; - SplitMetadata that = (SplitMetadata) o; - - return parentShardId == that.parentShardId && - routingFactor == that.routingFactor && - routingNumShards == that.routingNumShards && - Objects.equals(childShardIds, that.childShardIds); - } - - @Override - public int hashCode() { - return Objects.hash(parentShardId, childShardIds, routingFactor, routingNumShards); - } - - public void toXContent(XContentBuilder builder) throws IOException { - builder.startObject(String.valueOf(parentShardId)); - builder.startArray("child_shard_ids"); - for (final Integer childShard : childShardIds) { - builder.value(childShard); - } - builder.endArray(); - builder.field("routing_factor", routingFactor); - builder.field("routing_num_shards", routingNumShards); - builder.endObject(); - } - - public static SplitMetadata parse(XContentParser parser, String parentShardIdText) throws IOException { - int parentShardId = Integer.parseInt(parentShardIdText); - List childShardIds = new ArrayList<>(); - int routingFactor = 0; - int routingNumShards = 0; - XContentParser.Token token; - String currentFieldName = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (currentFieldName != null) { - switch (currentFieldName) { - case "child_shard_ids": - if (token == XContentParser.Token.START_ARRAY) { - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - childShardIds.add(parser.intValue()); - } - } - break; - case "routing_factor": - routingFactor = parser.intValue(); - break; - case "routing_num_shards": - routingNumShards = parser.intValue(); - break; - } - } - - } - return new SplitMetadata(parentShardId, childShardIds, routingFactor, routingNumShards); - } - -} diff --git a/server/src/main/java/org/opensearch/cluster/metadata/SplitShardsMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/SplitShardsMetadata.java new file mode 100644 index 0000000000000..7dffb089fa6fb --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/metadata/SplitShardsMetadata.java @@ -0,0 +1,448 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.cluster.AbstractDiffable; +import org.opensearch.cluster.Diff; +import org.opensearch.common.collect.Tuple; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class SplitShardsMetadata extends AbstractDiffable implements ToXContentFragment { + private static final int MINIMUM_RANGE_LENGTH_THRESHOLD = 1000; + public static final int SPLIT_NOT_IN_PROGRESS = -2; + + private static final String KEY_ROOT_SHARDS_TO_ALL_CHILDREN = "root_shards_to_all_children"; + private static final String KEY_NUMBER_OF_ROOT_SHARDS = "num_of_root_shards"; + private static final String KEY_TEMP_SHARD_ID_TO_CHILD_SHARDS = "temp_shard_id_to_child_shards"; + private static final String KEY_MAX_SHARD_ID = "max_shard_id"; + private static final String KEY_IN_PROGRESS_SPLIT_SHARD_ID = "in_progress_split_shard_id"; + + + + // Root shard id to flat list of all child shards under root. + private final ShardRange[][] rootShardsToAllChildren; + // Mapping of a parent shard ID to children. This is a temporary map since a shard id of parent is reused + // in one of its children and triggering a split of a child which is using the shard id of parent can replace + // child shards of its parent with its own child shards. + private final Map parentToChildShards; + private final int maxShardId; + private final int inProgressSplitShardId; + + + + private SplitShardsMetadata(ShardRange[][] rootShardsToAllChildren, Map parentToChildShards, + int inProgressSplitShardId, int maxShardId) { + + this.rootShardsToAllChildren = rootShardsToAllChildren; + this.parentToChildShards = parentToChildShards; + this.maxShardId = maxShardId; + this.inProgressSplitShardId = inProgressSplitShardId; + } + + public SplitShardsMetadata(StreamInput in) throws IOException { + int numberOfRootShards = in.readVInt(); + this.rootShardsToAllChildren = new ShardRange[numberOfRootShards][]; + for (int i=0; i < numberOfRootShards; i++) { + this.rootShardsToAllChildren[i] = in.readOptionalArray(ShardRange::new, ShardRange[]::new); + } + this.maxShardId = in.readVInt(); + this.inProgressSplitShardId = in.readInt(); + this.parentToChildShards = in.readMap(StreamInput::readInt, i-> i.readArray(ShardRange::new, ShardRange[]::new)); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(rootShardsToAllChildren.length); + for (ShardRange[] rootShardsToAllChild : rootShardsToAllChildren) { + out.writeOptionalArray(rootShardsToAllChild); + } + out.writeVInt(this.maxShardId); + out.writeInt(this.inProgressSplitShardId); + out.writeMap(this.parentToChildShards, StreamOutput::writeInt, StreamOutput::writeArray); + } + + public int getShardIdOfHash(int rootShardId, int hash, boolean includeInProgressChildren) { + // First check if we have child shards against this root shard. + if (rootShardsToAllChildren[rootShardId] == null) { + if (includeInProgressChildren && parentToChildShards.containsKey(rootShardId)) { + ShardRange shardRange = binarySearchShards(parentToChildShards.get(rootShardId), hash); + assert shardRange != null; + return shardRange.getShardId(); + } + return rootShardId; + } + + + ShardRange[] existingChildShards = rootShardsToAllChildren[rootShardId]; + ShardRange shardRange = binarySearchShards(existingChildShards, hash); + assert shardRange != null; + + if (includeInProgressChildren && parentToChildShards.containsKey(shardRange.getShardId())) { + shardRange = binarySearchShards(parentToChildShards.get(shardRange.getShardId()), hash); + } + assert shardRange != null; + + return shardRange.getShardId(); + } + + private ShardRange binarySearchShards(ShardRange[] childShards, int hash) { + int low = 0, high = childShards.length - 1; + while (low <= high) { + int mid = low + (high - low) / 2; + ShardRange midShard = childShards[mid]; + if (midShard.contains(hash)) { + return midShard; + } else if (hash < midShard.getStart()) { + high = mid - 1; + } else { + low = mid + 1; + } + } + return null; + } + + public int getNumberOfRootShards() { + return rootShardsToAllChildren.length; + } + + public int getNumberOfShards() { + return maxShardId + 1; + } + + public ShardRange[] getChildShardsOfParent(int shardId) { + if (parentToChildShards.containsKey(shardId) == false) { + return null; + } + + ShardRange[] childShards = new ShardRange[parentToChildShards.get(shardId).length]; + int childShardIdx = 0; + for (ShardRange childShard : parentToChildShards.get(shardId)) { + childShards[childShardIdx++] = childShard.copy(); + } + return childShards; + } + + public int numberOfEmptyParentShards() { + int emptyParents = parentToChildShards.size(); + if (inProgressSplitShardId != SPLIT_NOT_IN_PROGRESS) { + emptyParents -= 1; + } + return emptyParents; + } + + private static void validateShardRanges(int shardId, ShardRange[] shardRanges) { + Integer start = null; + for (ShardRange shardRange : shardRanges) { + validateBounds(shardRange, start); + long rangeEnd = shardRange.getEnd(); + long rangeLength = rangeEnd - shardRange.getStart() + 1; + if (rangeLength < MINIMUM_RANGE_LENGTH_THRESHOLD) { + throw new IllegalArgumentException( + "Shard range from " + shardRange.getStart() + " to " + shardRange.getEnd() + + " is below shard range threshold of " + MINIMUM_RANGE_LENGTH_THRESHOLD); + } + + start = shardRange.getEnd(); + } + + if (start == null) { + throw new IllegalArgumentException("No shard range defined for child shards of shard " + shardId); + } + + if (start != Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "Shard range from " + (start + 1) + " to " + Integer.MAX_VALUE + + " is missing from the list of shard ranges"); + } + } + + private static void validateBounds(ShardRange shardRange, Integer start) { + if (start == null) { + if (shardRange.getStart() != Integer.MIN_VALUE) { + throw new IllegalArgumentException( + "Shard range from " + Integer.MIN_VALUE + " to " + (shardRange.getStart() - 1) + + " is missing from the list of shard ranges"); + } + } else if (shardRange.getStart() != start + 1) { + throw new IllegalArgumentException( + "Shard range from " + (start + 1) + " to " + (shardRange.getStart() - 1) + + " is missing from the list of shard ranges"); + } + } + + public static class Builder { + private final ShardRange[][] rootShardsToAllChildren; + private final Map parentToChildShards; + private int maxShardId; + private int inProgressSplitShardId; + + public Builder(int numberOfShards) { + maxShardId = numberOfShards - 1; + rootShardsToAllChildren = new ShardRange[numberOfShards][]; + parentToChildShards = new HashMap<>(); + inProgressSplitShardId = SPLIT_NOT_IN_PROGRESS; + } + + public Builder(SplitShardsMetadata splitShardsMetadata) { + this.maxShardId = splitShardsMetadata.maxShardId; + + this.rootShardsToAllChildren = new ShardRange[splitShardsMetadata.rootShardsToAllChildren.length][]; + for (int i = 0; i < splitShardsMetadata.rootShardsToAllChildren.length; i++) { + if (splitShardsMetadata.rootShardsToAllChildren[i] != null) { + this.rootShardsToAllChildren[i] = new ShardRange[splitShardsMetadata.rootShardsToAllChildren[i].length]; + int j = 0; + for (ShardRange childShard : splitShardsMetadata.rootShardsToAllChildren[i]) { + this.rootShardsToAllChildren[i][j++] = childShard.copy(); + } + } + } + + this.parentToChildShards = new HashMap<>(); + for (Integer parentShardId : splitShardsMetadata.parentToChildShards.keySet()) { + // Getting a copy of child shards for this parent. + ShardRange[] childShards = splitShardsMetadata.getChildShardsOfParent(parentShardId); + this.parentToChildShards.put(parentShardId, childShards); + } + + inProgressSplitShardId = splitShardsMetadata.inProgressSplitShardId; + } + + /** + * Create metadata of new child shards for the provided shard id. + * @param splitShardId Shard id to split + * @param numberOfChildren Number of child shards this shard is going to have. + */ + public void splitShard(int splitShardId, int numberOfChildren) { + if (inProgressSplitShardId != SPLIT_NOT_IN_PROGRESS || parentToChildShards.containsKey(splitShardId)) { + throw new IllegalArgumentException("Split of shard [" + inProgressSplitShardId + + "] is already in progress or completed."); + } + + inProgressSplitShardId = splitShardId; + + Tuple shardTuple = findRootAndShard(splitShardId, rootShardsToAllChildren); + if (shardTuple == null) { + throw new IllegalArgumentException("Invalid shard id provided for splitting"); + } + ShardRange parentShard = shardTuple.v2(); + + long rangeSize = ((long)parentShard.getEnd() - parentShard.getStart()) / numberOfChildren; + + if(rangeSize <= MINIMUM_RANGE_LENGTH_THRESHOLD) { + throw new IllegalArgumentException("Cannot split shard [" + splitShardId + "] further."); + } + + long start = parentShard.getStart(); + List newChildShardsList = new ArrayList<>(); + int nextChildShardId = maxShardId + 1; + for (int i = 0; i < numberOfChildren; ++i) { + long end = i == numberOfChildren - 1 ? parentShard.getEnd() : start + rangeSize; + int childShardId = nextChildShardId++; + ShardRange childShard = new ShardRange(childShardId, (int) start, (int) end); + newChildShardsList.add(childShard); + start = end + 1; + } + ShardRange[] newShardRanges = newChildShardsList.toArray(new ShardRange[0]); + validateShardRanges(splitShardId, newShardRanges); + + parentToChildShards.put(splitShardId, newShardRanges); + } + + public void updateSplitMetadataForChildShards(int sourceShardId, Set newChildShardIds) { + Tuple shardRangeTuple = findRootAndShard(sourceShardId, rootShardsToAllChildren); + assert shardRangeTuple != null; + + assert newChildShardIds.size() == parentToChildShards.get(sourceShardId).length; + for (ShardRange childShard : parentToChildShards.get(sourceShardId)) { + assert newChildShardIds.contains(childShard.getShardId()); + } + + List shardsUnderRoot = rootShardsToAllChildren[shardRangeTuple.v1()] == null ? new ArrayList<>() : + Arrays.asList(rootShardsToAllChildren[shardRangeTuple.v1()]); + shardsUnderRoot.remove(shardRangeTuple.v2()); + shardsUnderRoot.addAll(Arrays.asList(parentToChildShards.get(sourceShardId))); + ShardRange[] newShardsUnderRoot = shardsUnderRoot.toArray(new ShardRange[0]); + Arrays.sort(newShardsUnderRoot); + validateShardRanges(shardRangeTuple.v1(), newShardsUnderRoot); + + maxShardId += newChildShardIds.size(); + rootShardsToAllChildren[shardRangeTuple.v1()] = newShardsUnderRoot; + inProgressSplitShardId = SPLIT_NOT_IN_PROGRESS; + } + + public void cancelSplit(int sourceShardId) { + assert inProgressSplitShardId != SPLIT_NOT_IN_PROGRESS; + parentToChildShards.remove(sourceShardId); + inProgressSplitShardId = SPLIT_NOT_IN_PROGRESS; + } + + public SplitShardsMetadata build() { + return new SplitShardsMetadata(this.rootShardsToAllChildren, this.parentToChildShards, + this.inProgressSplitShardId, this.maxShardId); + } + } + + private static Tuple findRootAndShard(int shardId, ShardRange[][] rootShardsToAllChildren) { + ShardRange[] allChildren; + for (int rootShardId = 0; rootShardId < rootShardsToAllChildren.length; rootShardId++) { + allChildren = rootShardsToAllChildren[rootShardId]; + if (allChildren != null) { + for (ShardRange shardUnderRoot : allChildren) { + if (shardUnderRoot.getShardId() == shardId) { + return new Tuple<>(rootShardId, shardUnderRoot); + } + } + } + } + + if (shardId < rootShardsToAllChildren.length && rootShardsToAllChildren[shardId] == null) { + // We are splitting a root shard in this case. + return new Tuple<>(shardId, new ShardRange(shardId, Integer.MIN_VALUE, Integer.MAX_VALUE)); + } + + return null; + } + + public int getInProgressSplitShardId() { + return inProgressSplitShardId; + } + + public boolean isSplitOfShardInProgress(int shardId) { + return inProgressSplitShardId == shardId; + } + + public boolean isEmptyParentShard(int shardId) { + return isSplitOfShardInProgress(shardId) == false && parentToChildShards.containsKey(shardId); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof SplitShardsMetadata)) return false; + + SplitShardsMetadata that = (SplitShardsMetadata) o; + + if (maxShardId != that.maxShardId) return false; + if (inProgressSplitShardId != that.inProgressSplitShardId) return false; + if (!Arrays.deepEquals(rootShardsToAllChildren, that.rootShardsToAllChildren)) return false; + return parentToChildShards.equals(that.parentToChildShards); + } + + @Override + public int hashCode() { + int result = Arrays.deepHashCode(rootShardsToAllChildren); + result = 31 * result + parentToChildShards.hashCode(); + result = 31 * result + maxShardId; + result = 31 * result + inProgressSplitShardId; + return result; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(KEY_NUMBER_OF_ROOT_SHARDS, rootShardsToAllChildren.length); + builder.field(KEY_MAX_SHARD_ID, maxShardId); + builder.field(KEY_IN_PROGRESS_SPLIT_SHARD_ID, inProgressSplitShardId); + builder.startObject(KEY_ROOT_SHARDS_TO_ALL_CHILDREN); + for (int rootShardId = 0; rootShardId < rootShardsToAllChildren.length; rootShardId++) { + ShardRange[] childShards = rootShardsToAllChildren[rootShardId]; + if (childShards != null) { + builder.startArray(String.valueOf(rootShardId)); + for (ShardRange childShard : childShards) { + childShard.toXContent(builder, params); + } + builder.endArray(); + } + } + builder.endObject(); + + builder.startObject(KEY_TEMP_SHARD_ID_TO_CHILD_SHARDS); + for (Integer parentShardId : parentToChildShards.keySet()) { + builder.startArray(String.valueOf(parentShardId)); + for (ShardRange childShard : parentToChildShards.get(parentShardId)) { + childShard.toXContent(builder, params); + } + builder.endArray(); + } + builder.endObject(); + + return builder; + } + + public static SplitShardsMetadata parse(XContentParser parser) throws IOException { + XContentParser.Token token; + String currentFieldName = null; + int maxShardId = -1, inProgressSplitShardId = SPLIT_NOT_IN_PROGRESS; + ShardRange[][] rootShardsToAllChildren = null; + Map tempShardIdToChildShards = new HashMap<>(); + int numberOfRootShards = -1; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (KEY_MAX_SHARD_ID.equals(currentFieldName)) { + maxShardId = parser.intValue(); + } else if (KEY_IN_PROGRESS_SPLIT_SHARD_ID.equals(currentFieldName)) { + inProgressSplitShardId = parser.intValue(); + } else if (KEY_NUMBER_OF_ROOT_SHARDS.equals(currentFieldName)) { + numberOfRootShards = parser.intValue(); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (KEY_ROOT_SHARDS_TO_ALL_CHILDREN.equals(currentFieldName)) { + Map rootShards = parseShardsMap(parser); + rootShardsToAllChildren = new ShardRange[numberOfRootShards][]; + for (Map.Entry entry : rootShards.entrySet()) { + rootShardsToAllChildren[entry.getKey()] = entry.getValue(); + } + } else if (KEY_TEMP_SHARD_ID_TO_CHILD_SHARDS.equals(currentFieldName)) { + tempShardIdToChildShards = parseShardsMap(parser); + } + } + } + + return new SplitShardsMetadata(rootShardsToAllChildren, tempShardIdToChildShards, inProgressSplitShardId, maxShardId); + } + + private static Map parseShardsMap(XContentParser parser) throws IOException { + XContentParser.Token token; + String currentFieldName = null; + Map shardsMap = new HashMap<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_ARRAY) { + List childShardRanges = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + ShardRange shardRange = ShardRange.parse(parser); + childShardRanges.add(shardRange); + } + assert currentFieldName != null; + Integer parentShard = Integer.parseInt(currentFieldName); + shardsMap.put(parentShard, childShardRanges.toArray(new ShardRange[0])); + } + } + + return shardsMap; + } + + public static Diff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(SplitShardsMetadata::new, in); + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index 4d19998a2986f..b7476e7fc814d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -125,10 +125,12 @@ boolean validate(Metadata metadata) { } // check the number of shards - if (indexMetadata.getNumberOfServingShards() != shards().size()) { + if (indexMetadata.getNumberOfShards() - indexMetadata.getSplitShardsMetadata().numberOfEmptyParentShards() != shards().size()) { Set expected = new HashSet<>(); - for (int shardId : indexMetadata.getServingShardIds()) { - expected.add(shardId); + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + if (indexMetadata.getSplitShardsMetadata().isEmptyParentShard(i) == false) { + expected.add(i); + } } for (IndexShardRoutingTable indexShardRoutingTable : this) { expected.remove(indexShardRoutingTable.shardId().id()); @@ -556,7 +558,7 @@ private Builder initializeEmpty(IndexMetadata indexMetadata, UnassignedInfo unas if (!shards.isEmpty()) { throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created"); } - for (Integer shardNumber : indexMetadata.getServingShardIds()) { + for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) { ShardId shardId = new ShardId(index, shardNumber); final RecoverySource primaryRecoverySource; if (indexMetadata.inSyncAllocationIds(shardNumber).isEmpty() == false) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index 71a2c41d668ea..279f139e3a485 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -34,7 +34,6 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.metadata.SplitMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; @@ -449,7 +448,7 @@ protected IndexShardRoutingTable shards(ClusterState clusterState, String index, public ShardId shardWithRecoveringChild(ClusterState clusterState, String index, String id, String routing, Index shardIndex) { - int shardId = generateShardId(indexMetadata(clusterState, index), id, routing, (shard) -> true); + int shardId = generateShardId(indexMetadata(clusterState, index), id, routing, true); return new ShardId(shardIndex, shardId); } @@ -459,11 +458,11 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null } public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) { - return generateShardId(indexMetadata, id, routing, indexMetadata::isNonServingShard); + return generateShardId(indexMetadata, id, routing, false); } public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing, - Predicate shouldIncludeChildShards) { + boolean includeInProgressChild) { final String effectiveRouting; final int partitionOffset; @@ -481,27 +480,21 @@ public static int generateShardId(IndexMetadata indexMetadata, @Nullable String partitionOffset = 0; } - return calculateShardIdOfChild(indexMetadata, effectiveRouting, partitionOffset, shouldIncludeChildShards); + return calculateShardIdOfChild(indexMetadata, effectiveRouting, partitionOffset, includeInProgressChild); } private static int calculateScaledShardId(IndexMetadata indexMetadata, String effectiveRouting, int partitionOffset) { - return calculateShardIdOfChild(indexMetadata, effectiveRouting, partitionOffset, indexMetadata::isNonServingShard); + return calculateShardIdOfChild(indexMetadata, effectiveRouting, partitionOffset, false); } private static int calculateShardIdOfChild(IndexMetadata indexMetadata, String effectiveRouting, int partitionOffset, - Predicate canIncludeChildShardIds) { + boolean includeInProgressChild) { final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset; - int shardId = Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor(); - - while (indexMetadata.isParentShard(shardId) && canIncludeChildShardIds.test(shardId)) { - final SplitMetadata splitMetadata = indexMetadata.getSplitMetadata(shardId); - int childShardIdx = Math.floorMod(hash, splitMetadata.getRoutingNumShards()) / splitMetadata.getRoutingFactor(); - shardId = splitMetadata.getChildShardIdAtIndex(childShardIdx); - } - // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size // of original index to hash documents - return shardId; + int rootShardId = Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor(); + + return indexMetadata.getSplitShardsMetadata().getShardIdOfHash(rootShardId, hash, includeInProgressChild); } private void checkPreferenceBasedRoutingAllowed(Preference preference, @Nullable WeightedRoutingMetadata weightedRoutingMetadata) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 54660c4b87a06..6fe163a51e32b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -37,6 +37,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.ShardRange; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; @@ -290,7 +291,7 @@ private ShardRouting findAssignedPrimaryIfPeerRecoveryOrSplit(ShardRouting routi } if (routing.isSplitTarget()) { - List shardRoutings = assignedShards.get(routing.getSplittingShardId()); + List shardRoutings = assignedShards.get(routing.getParentShardId()); if (shardRoutings != null) { for (ShardRouting shardRouting : shardRoutings) { if (shardRouting.primary()) { @@ -298,7 +299,7 @@ private ShardRouting findAssignedPrimaryIfPeerRecoveryOrSplit(ShardRouting routi return shardRouting; } else if (primary == null) { primary = shardRouting; - } else if (primary.getRecoveringChildShardIds() != null) { + } else if (primary.getRecoveringChildShardRanges() != null) { primary = shardRouting; } } @@ -597,7 +598,8 @@ public Tuple> splitShard( ) { ensureMutable(); splittingShards++; - ShardRouting source = startedShard.split(indexMetadata.getChildShardIds(startedShard.shardId().id()), expectedShardSize); + ShardRange[] childShardRanges = indexMetadata.getSplitShardsMetadata().getChildShardsOfParent(startedShard.shardId().id()); + ShardRouting source = startedShard.split(childShardRanges, expectedShardSize); updateAssigned(startedShard, source); ShardRouting[] childShards = source.getRecoveringChildShards(); List childShardsList = Arrays.asList(childShards); @@ -620,7 +622,7 @@ public void startInPlaceChildShards( ensureMutable(); assert !childShards.isEmpty(); ShardRouting parentShard = getByAllocationId( - childShards.get(0).getSplittingShardId(), + childShards.get(0).getParentShardId(), childShards.get(0).allocationId().getParentAllocationId() ); int validShardEvents = 0, invalidShardEvents = 0; @@ -639,8 +641,8 @@ public void startInPlaceChildShards( + "], Number of missing child shards in started shard event: [" + (parentShard.getRecoveringChildShards().length - validShardEvents) + "], Parent shard is valid: [" - + (indexMetadata.isParentShard(parentShard.shardId().id()) == true) - + "]. Failing all child shards and cancelling relocation." + + (indexMetadata.getSplitShardsMetadata().isSplitOfShardInProgress(parentShard.shardId().id()) == true) + + "]. Failing all child shards and cancelling split." ); // We just need to fail one child shard because failShard ensures that failure of any child shard // fails all child shards and cancels split of source shard. @@ -741,6 +743,10 @@ public void failShard( RoutingChangesObserver routingChangesObserver ) { ensureMutable(); + if (failedShard.isSplitTarget() && getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == null) { + // We already removed this child when parent failed. + return; + } assert failedShard.assignedToNode() : "only assigned shards can be failed"; assert indexMetadata.getIndex().equals(failedShard.index()) : "shard failed for unknown index (shard entry: " + failedShard + ")"; assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard @@ -816,7 +822,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId } } else if (failedShardAllocId.getParentAllocationId() != null) { ShardRouting sourceShard = getByAllocationId( - failedShard.getSplittingShardId(), + failedShard.getParentShardId(), failedShard.allocationId().getParentAllocationId() ); // If source shard is not splitting then we must have failed it in previous iteration of child shard. @@ -1460,7 +1466,6 @@ private static void assertRecoveriesPerNode( boolean verifyOutgoingRecoveries, Function incomingCountFilter ) { - Set splittingShards = new HashSet<>(); for (Map.Entry recoveries : recoveriesPerNode.entrySet()) { String node = recoveries.getKey(); final Recoveries value = recoveries.getValue(); @@ -1479,12 +1484,11 @@ private static void assertRecoveriesPerNode( if (assigned.initializing() && assigned.recoverySource().getType() == RecoverySource.Type.PEER) { outgoing++; } else if (assigned.splitting()) { - assert assigned.getRecoveringChildShards().length == assigned.getRecoveringChildShardIds().length; + assert assigned.getRecoveringChildShards().length == assigned.getRecoveringChildShardRanges().length; for (ShardRouting childShardRouting : assigned.getRecoveringChildShards()) { assert routingNodes.assignedShards.containsKey(childShardRouting.shardId()); for (ShardRouting assignedChildShard : routingNodes.assignedShards.get(childShardRouting.shardId())) { - assert assignedChildShard.primary() == false - || assignedChildShard.getSplittingShardId().equals(assigned.shardId()); + assert assignedChildShard.getParentShardId().equals(assigned.shardId()); } } outgoing++; diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index 636bfa809af20..ddab0895bb680 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -33,6 +33,7 @@ package org.opensearch.cluster.routing; import org.opensearch.Version; +import org.opensearch.cluster.metadata.ShardRange; import org.opensearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource; import org.opensearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; @@ -78,11 +79,12 @@ public class ShardRouting implements Writeable, ToXContentObject { @Nullable private final ShardRouting targetRelocatingShard; @Nullable - private final ShardId[] recoveringChildShardIds; + private final ShardRange[] recoveringChildShardRanges; @Nullable private final ShardRouting[] recoveringChildShards; @Nullable - private final ShardId splittingShardId; + private final ShardId parentShardId; + private final ShardRange shardRange; /** * A constructor to internally create shard routing instances, note, the internal flag should only be set to true @@ -98,8 +100,9 @@ protected ShardRouting( UnassignedInfo unassignedInfo, AllocationId allocationId, long expectedShardSize, - ShardId[] childShardIds, - ShardId splittingShardId + ShardRange shardRange, + ShardId parentShardId, + ShardRange[] childShardRanges ) { this.shardId = shardId; this.currentNodeId = currentNodeId; @@ -111,9 +114,10 @@ protected ShardRouting( this.allocationId = allocationId; this.expectedShardSize = expectedShardSize; this.targetRelocatingShard = initializeTargetRelocatingShard(); - this.recoveringChildShardIds = childShardIds; + this.recoveringChildShardRanges = childShardRanges; this.recoveringChildShards = initializeRecoveringChildShards(); - this.splittingShardId = splittingShardId; + this.parentShardId = parentShardId; + this.shardRange = shardRange; this.asList = Collections.singletonList(this); assert expectedShardSize == UNAVAILABLE_EXPECTED_SHARD_SIZE || state == ShardRoutingState.INITIALIZING @@ -145,6 +149,7 @@ private ShardRouting initializeTargetRelocatingShard() { unassignedInfo, AllocationId.newTargetRelocation(allocationId), expectedShardSize, + shardRange, null, null ); @@ -160,7 +165,7 @@ private ShardRouting[] initializeRecoveringChildShards() { ShardRouting[] childShards = new ShardRouting[childShardAllocIds.size()]; for (int idx = 0; idx < childShardAllocIds.size(); idx++) { childShards[idx] = new ShardRouting( - recoveringChildShardIds[idx], + new ShardId(shardId.getIndex(), recoveringChildShardRanges[idx].getShardId()), currentNodeId, null, primary, @@ -169,8 +174,9 @@ private ShardRouting[] initializeRecoveringChildShards() { unassignedInfo, AllocationId.newTargetSplit(allocationId, childShardAllocIds.get(idx)), expectedShardSize, - null, - shardId + recoveringChildShardRanges[idx], + shardId, + null ); } return childShards; @@ -199,6 +205,7 @@ public static ShardRouting newUnassigned( null, UNAVAILABLE_EXPECTED_SHARD_SIZE, null, + null, null ); } @@ -380,13 +387,15 @@ public ShardRouting(ShardId shardId, StreamInput in) throws IOException { expectedShardSize = shardSize; asList = Collections.singletonList(this); targetRelocatingShard = initializeTargetRelocatingShard(); - splittingShardId = null; + parentShardId = null; if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - recoveringChildShardIds = in.readOptionalArray(ShardId::new, ShardId[]::new); + shardRange = in.readOptionalWriteable(ShardRange::new); + recoveringChildShardRanges = in.readOptionalArray(ShardRange::new, ShardRange[]::new); recoveringChildShards = initializeRecoveringChildShards(); } else { recoveringChildShards = null; - recoveringChildShardIds = null; + recoveringChildShardRanges = null; + shardRange = null; } } @@ -418,9 +427,10 @@ public void writeToThin(StreamOutput out) throws IOException { out.writeLong(expectedShardSize); } if (out.getVersion().onOrAfter(Version.V_3_0_0)) { - out.writeOptionalArray(recoveringChildShardIds); + out.writeOptionalWriteable(shardRange); + out.writeOptionalArray(recoveringChildShardRanges); } else { - if (recoveringChildShardIds != null) { + if (recoveringChildShardRanges != null) { // In-progress shard split is not allowed in a mixed cluster where node(s) with an unsupported split // version is present. Hence, we also don't want to allow a node with an unsupported version // to get this state while shard split is in-progress. @@ -448,8 +458,9 @@ public ShardRouting updateUnassigned(UnassignedInfo unassignedInfo, RecoverySour unassignedInfo, allocationId, expectedShardSize, - recoveringChildShardIds, - splittingShardId + shardRange, + null, + null ); } @@ -478,6 +489,7 @@ public ShardRouting moveToUnassigned(UnassignedInfo unassignedInfo) { unassignedInfo, null, UNAVAILABLE_EXPECTED_SHARD_SIZE, + shardRange, null, null ); @@ -507,6 +519,7 @@ public ShardRouting initialize(String nodeId, @Nullable String existingAllocatio unassignedInfo, allocationId, expectedShardSize, + shardRange, null, null ); @@ -529,6 +542,7 @@ public ShardRouting relocate(String relocatingNodeId, long expectedShardSize) { null, AllocationId.newRelocation(allocationId), expectedShardSize, + shardRange, null, null ); @@ -538,13 +552,8 @@ public ShardRouting relocate(String relocatingNodeId, long expectedShardSize) { * Split the shard. * */ - public ShardRouting split(List childShardIds, long expectedShardSize) { + public ShardRouting split(ShardRange[] shardRanges, long expectedShardSize) { assert state == ShardRoutingState.STARTED : "current shard has to be started in order to be split " + this; - ShardId[] childShards = new ShardId[childShardIds.size()]; - int idx = 0; - for (Integer childShardId : childShardIds) { - childShards[idx++] = new ShardId(shardId.getIndex(), childShardId); - } return new ShardRouting( shardId, @@ -554,10 +563,11 @@ public ShardRouting split(List childShardIds, long expectedShardSize) { ShardRoutingState.SPLITTING, recoverySource, null, - AllocationId.newSplit(allocationId, childShardIds.size()), - expectedShardSize / childShards.length, - childShards, - null + AllocationId.newSplit(allocationId, shardRanges.length), + expectedShardSize, + shardRange, + null, + shardRanges ); } @@ -579,6 +589,7 @@ public ShardRouting cancelRelocation() { null, AllocationId.cancelRelocation(allocationId), UNAVAILABLE_EXPECTED_SHARD_SIZE, + shardRange, null, null ); @@ -602,6 +613,7 @@ public ShardRouting cancelSplit() { null, AllocationId.cancelSplit(allocationId), UNAVAILABLE_EXPECTED_SHARD_SIZE, + shardRange, null, null ); @@ -627,6 +639,7 @@ public ShardRouting removeRelocationSource() { unassignedInfo, AllocationId.finishRelocation(allocationId), expectedShardSize, + shardRange, null, null ); @@ -649,6 +662,7 @@ public ShardRouting reinitializeReplicaShard() { unassignedInfo, AllocationId.newInitializing(), expectedShardSize, + shardRange, null, null ); @@ -679,6 +693,7 @@ public ShardRouting moveToStarted() { null, allocationId, UNAVAILABLE_EXPECTED_SHARD_SIZE, + shardRange, null, null ); @@ -704,8 +719,9 @@ public ShardRouting moveActivePrimaryToReplica() { unassignedInfo, allocationId, expectedShardSize, - recoveringChildShardIds, - splittingShardId + shardRange, + parentShardId, + recoveringChildShardRanges ); } @@ -729,8 +745,9 @@ public ShardRouting moveActiveReplicaToPrimary() { unassignedInfo, allocationId, expectedShardSize, - recoveringChildShardIds, - splittingShardId + shardRange, + parentShardId, + recoveringChildShardRanges ); } @@ -754,8 +771,9 @@ public ShardRouting moveUnassignedFromPrimary() { unassignedInfo, allocationId, expectedShardSize, - recoveringChildShardIds, - splittingShardId + shardRange, + parentShardId, + recoveringChildShardRanges ); } @@ -785,7 +803,7 @@ public boolean isRelocationTarget() { * (i.e., was created with {@link #initializeRecoveringChildShards()} ()} */ public boolean isSplitTarget() { - return state == ShardRoutingState.INITIALIZING && getSplittingShardId() != null; + return state == ShardRoutingState.INITIALIZING && getParentShardId() != null; } /** @@ -812,7 +830,7 @@ public boolean isSplitTargetOf(ShardRouting other) { + other + "]"; - assert b == false || this.getSplittingShardId().equals(other.shardId()) + assert b == false || this.getParentShardId().equals(other.shardId()) : "ShardRouting is a splitting target but current splitting shard id isn't equal to source shard id." + " This [" + this @@ -902,7 +920,7 @@ public boolean isSplitSourceOf(ShardRouting other) { + other + "]"; - assert b == false || this.shardId.equals(other.getSplittingShardId()) + assert b == false || this.shardId.equals(other.getParentShardId()) : "ShardRouting is a splitting source but current shard id isn't equal to target splitting shard id." + " This [" + this @@ -994,7 +1012,7 @@ public boolean equalsIgnoringMetadata(ShardRouting other) { if (recoverySource != null ? !recoverySource.equals(other.recoverySource) : other.recoverySource != null) { return false; } - if (Arrays.equals(recoveringChildShardIds, other.recoveringChildShardIds) == false) { + if (Arrays.equals(recoveringChildShardRanges, other.recoveringChildShardRanges) == false) { return false; } return true; @@ -1033,7 +1051,7 @@ public int hashCode() { h = 31 * h + (recoverySource != null ? recoverySource.hashCode() : 0); h = 31 * h + (allocationId != null ? allocationId.hashCode() : 0); h = 31 * h + (unassignedInfo != null ? unassignedInfo.hashCode() : 0); - h = 31 * h + (recoveringChildShardIds != null ? Arrays.hashCode(recoveringChildShardIds) : 0); + h = 31 * h + (recoveringChildShardRanges != null ? Arrays.hashCode(recoveringChildShardRanges) : 0); hashCode = h; } return h; @@ -1072,9 +1090,9 @@ public String shortSummary() { if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) { sb.append(", expected_shard_size[").append(expectedShardSize).append("]"); } - if (recoveringChildShardIds != null) { - sb.append(", recovering_child_shard ids["); - for (ShardId childShard : recoveringChildShardIds) { + if (recoveringChildShardRanges != null) { + sb.append(", recovering_child_shards ["); + for (ShardRange childShard : recoveringChildShardRanges) { sb.append(childShard.toString()); } sb.append("]"); @@ -1104,9 +1122,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (unassignedInfo != null) { unassignedInfo.toXContent(builder, params); } - if (recoveringChildShardIds != null) { - builder.startArray("recovering_child_shard_ids"); - for (ShardId childShardId : recoveringChildShardIds) { + if (recoveringChildShardRanges != null) { + builder.startArray("recovering_child_shard_ranges"); + for (ShardRange childShardId : recoveringChildShardRanges) { childShardId.toXContent(builder, params); } builder.endArray(); @@ -1146,11 +1164,15 @@ public ShardRouting[] getRecoveringChildShards() { return recoveringChildShards; } - public ShardId[] getRecoveringChildShardIds() { - return recoveringChildShardIds; + public ShardRange getShardRange() { + return shardRange; + } + + public ShardRange[] getRecoveringChildShardRanges() { + return recoveringChildShardRanges; } - public ShardId getSplittingShardId() { - return splittingShardId; + public ShardId getParentShardId() { + return parentShardId; } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 70890ef2a66f6..16a400e842984 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -615,8 +615,8 @@ private void applyStartedShards(RoutingAllocation routingAllocation, List new ArrayList<>()); - inPlaceChildShards.get(startedShard.getSplittingShardId()).add(startedShard); + inPlaceChildShards.computeIfAbsent(startedShard.getParentShardId(), k -> new ArrayList<>()); + inPlaceChildShards.get(startedShard.getParentShardId()).add(startedShard); } else { routingNodes.startShard(logger, startedShard, routingAllocation.changes()); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java index 9d1d9ed2567b5..c1c00a3448787 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -172,7 +172,7 @@ public Metadata applyChanges(Metadata oldMetadata, RoutingTable newRoutingTable) indexMetadataBuilder = updatePrimaryTerm(oldIndexMetadata, indexMetadataBuilder, shardId, updates); } // Invoke metadata update of in-place split only for the parent shard. - if (updates.addedChildShards.isEmpty() == false) { + if (updates.addedChildShards.isEmpty() == false && updates.splitFailed == false) { indexMetadataBuilder = updateMetadataForInPlaceSplitCompleted(oldIndexMetadata, indexMetadataBuilder, shardId, updates); } if (updates.splitFailed) { @@ -414,7 +414,7 @@ private IndexMetadata.Builder updateMetadataForInPlaceSplitFailed(IndexMetadata if (indexMetadataBuilder == null) { indexMetadataBuilder = IndexMetadata.builder(oldIndexMetadata); } - return indexMetadataBuilder.removeParentToChildShardMetadata(parentShardId.id()); + return indexMetadataBuilder.cancelSplit(parentShardId.id()); } /** @@ -444,7 +444,7 @@ private static class Updates { private boolean increaseTerm; // whether primary term should be increased // Child shard ids for this shard which is now split. To be added in in-sync, assign primary term of this shard // and update number of current shards. - private Map addedChildShards = new HashMap<>(); + private final Map addedChildShards = new HashMap<>(); private boolean isNewChildShard; private boolean splitFailed; private Set addedAllocationIds = new HashSet<>(); // allocation ids that should be added to the in-sync set diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java index d7299378d606a..2924efd1781d9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.routing.allocation.decider; +import org.opensearch.cluster.metadata.SplitShardsMetadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.RoutingAllocation; @@ -24,7 +25,8 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl public static Decision canRemainDecision(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { // If shardRouting is a started parent shard and fact that it exists is sufficient to conclude // that it needs to be split. - if (allocation.metadata().getIndexSafe(shardRouting.index()).isParentShard(shardRouting.shardId().id()) + SplitShardsMetadata splitShardsMetadata = allocation.metadata().getIndexSafe(shardRouting.index()).getSplitShardsMetadata(); + if (splitShardsMetadata.isSplitOfShardInProgress(shardRouting.shardId().id()) && shardRouting.started() && allocation.changes().isSplitOfShardFailed(shardRouting) == false) { return Decision.SPLIT; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 6ac6b1418c0a7..431ecd3e7e053 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -34,7 +34,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.DirectoryReader; @@ -66,7 +65,6 @@ import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.opensearch.action.bulk.TransportShardBulkAction; import org.opensearch.action.support.replication.PendingReplicationActions; -import org.opensearch.action.support.replication.ReplicationOperation; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.metadata.DataStream; import org.opensearch.cluster.metadata.IndexMetadata; @@ -431,7 +429,7 @@ public IndexShard( final String aId = shardRouting.allocationId().getId(); final long primaryTerm; if (shardRouting.isSplitTarget()) { - primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardRouting.getSplittingShardId().id()); + primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardRouting.getParentShardId().id()); } else { primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardId.id()); } @@ -2399,7 +2397,7 @@ private Translog.Operation overrideToNoOpIfOperationNotForShard(Translog.Operati final Translog.Index index = (Translog.Index) operation; int computedShardId = OperationRouting.generateShardId(indexSettings().getIndexMetadata(), - index.id(), index.routing(), (shardId) -> true); + index.id(), index.routing(), true); if (computedShardId != shardId().id()) { return new Translog.NoOp(index.seqNo(), index.primaryTerm(), "op belongs to another child shard"); } diff --git a/server/src/main/java/org/opensearch/index/shard/ShardSplittingQuery.java b/server/src/main/java/org/opensearch/index/shard/ShardSplittingQuery.java index 7e760177f0d4d..f5274c374f467 100644 --- a/server/src/main/java/org/opensearch/index/shard/ShardSplittingQuery.java +++ b/server/src/main/java/org/opensearch/index/shard/ShardSplittingQuery.java @@ -79,14 +79,14 @@ final class ShardSplittingQuery extends Query { private final IndexMetadata indexMetadata; private final int shardId; private final BitSetProducer nestedParentBitSetProducer; - private final Predicate shouldIncludeChildShards; + private final boolean includeInProgressChild; ShardSplittingQuery(IndexMetadata indexMetadata, int shardId, boolean hasNested, - Predicate shouldIncludeChildShards) { + boolean includeInProgressChild) { this.indexMetadata = indexMetadata; this.shardId = shardId; this.nestedParentBitSetProducer = hasNested ? newParentDocBitSetProducer(indexMetadata.getCreationVersion()) : null; - this.shouldIncludeChildShards = shouldIncludeChildShards; + this.includeInProgressChild = includeInProgressChild; } @Override @@ -107,7 +107,7 @@ public Scorer scorer(LeafReaderContext context) throws IOException { indexMetadata, Uid.decodeId(ref.bytes, ref.offset, ref.length), null, - shouldIncludeChildShards + includeInProgressChild ); return shardId == targetShardId; }; @@ -151,7 +151,7 @@ public Scorer scorer(LeafReaderContext context) throws IOException { }; // in the _routing case we first go and find all docs that have a routing value and mark the ones we have to delete findSplitDocs(RoutingFieldMapper.NAME, ref -> { - int targetShardId = OperationRouting.generateShardId(indexMetadata, null, ref.utf8ToString(), shouldIncludeChildShards); + int targetShardId = OperationRouting.generateShardId(indexMetadata, null, ref.utf8ToString(), includeInProgressChild); return shardId == targetShardId; }, leafReader, maybeWrapConsumer.apply(bitSet::set)); @@ -292,7 +292,7 @@ boolean matches(int doc) throws IOException { leftToVisit = 2; leafReader.storedFields().document(doc, this); assert id != null : "docID must not be null - we might have hit a nested document"; - int targetShardId = OperationRouting.generateShardId(indexMetadata, id, routing, shouldIncludeChildShards); + int targetShardId = OperationRouting.generateShardId(indexMetadata, id, routing, includeInProgressChild); return targetShardId != shardId; } } diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 0d95624bb489d..4b5af94d68762 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -229,7 +229,7 @@ public static void addIndices( Tuple addIndexDirectoryTuple = new Tuple<>(true, statsDirectory); addIndices(indexRecoveryStats, indexSort, sources, maxSeqNo, maxSeqNo, maxUnsafeAutoIdTimestamp, indexMetadata, - shardId, split, hasNested, addIndexDirectoryTuple, indexMetadata::isNonServingShard, + shardId, split, hasNested, addIndexDirectoryTuple, false, IndexWriterConfig.OpenMode.CREATE); } @@ -245,7 +245,7 @@ public static void addIndices( boolean split, boolean hasNested, Tuple addIndexDirectoryTuple, - Predicate shouldIncludeChildShards, + boolean includeInProgressChild, IndexWriterConfig.OpenMode openMode ) throws IOException { assert sources.length > 0; @@ -269,7 +269,7 @@ public static void addIndices( } indexRecoveryStats.setFileDetailsComplete(); if (split) { - writer.deleteDocuments(new ShardSplittingQuery(indexMetadata, shardId, hasNested, shouldIncludeChildShards)); + writer.deleteDocuments(new ShardSplittingQuery(indexMetadata, shardId, hasNested, includeInProgressChild)); } /* * We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index d240eae55bbf7..094a95414c4c3 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -664,11 +664,11 @@ private void createOrUpdateShards(final ClusterState state) { Shard shard = indexService.getShardOrNull(shardId.id()); if (shard == null) { if (shardRouting.isSplitTarget()) { - Shard sourceShard = indexService.getShardOrNull(shardRouting.getSplittingShardId().id()); - assert sourceShard!= null : "Source shard not found for shard id " + shardRouting.getSplittingShardId(); - childShardRoutings.computeIfAbsent(shardRouting.getSplittingShardId(), k -> + Shard sourceShard = indexService.getShardOrNull(shardRouting.getParentShardId().id()); + assert sourceShard!= null : "Source shard not found for shard id " + shardRouting.getParentShardId(); + childShardRoutings.computeIfAbsent(shardRouting.getParentShardId(), k -> new Tuple<>(sourceShard.routingEntry(), new ArrayList<>())); - childShardRoutings.get(shardRouting.getSplittingShardId()).v2().add(shardRouting); + childShardRoutings.get(shardRouting.getParentShardId()).v2().add(shardRouting); } else { assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards"; createShard(nodes, routingTable, shardRouting, state); @@ -727,7 +727,7 @@ private void createChildShardsForSplit(DiscoveryNodes nodes, Map { routings.v2().forEach(routing -> - failAndRemoveShard(routing, false, "failed to create child shards", e, state)); + failAndRemoveChildShards(routing, false, "failed to create child shards", e, state)); replicationListener.onFailure(null, new RecoveryFailedException(request, e.getCause()), true); }); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryService.java b/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryService.java index c2a9137cded05..85226e7c15949 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryService.java @@ -254,7 +254,7 @@ private ShardId getSplittingSourceShardId(IndexShard shard) { if (shard.routingEntry().splitting()) { return shard.shardId(); } else if (shard.routingEntry().isSplitTarget()) { - return shard.routingEntry().getSplittingShardId(); + return shard.routingEntry().getParentShardId(); } return null; } diff --git a/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryTargetHandler.java index 5c9a66ebf503d..7e39f973d103c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryTargetHandler.java @@ -408,7 +408,7 @@ private void split(long localCheckpoint, long maxSeqNo, long maxUnsafeAutoIdTime true, context.getIndexShard().mapperService().hasNested(), addIndexSplitDirectory, - (shardId) -> true, + true, IndexWriterConfig.OpenMode.APPEND ); } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java index 536406539b540..c58c71837f3f4 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java @@ -39,6 +39,8 @@ import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.admin.indices.stats.ShardStats; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.SplitShardsMetadata; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.common.Table; @@ -69,6 +71,7 @@ import java.time.Instant; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.function.Function; import static java.util.Arrays.asList; diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index 6b9373d6f60fd..9bbe84ac76978 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -690,7 +690,8 @@ private void validateExistingIndex( ); } // Make sure that the number of shards is the same. That's the only thing that we cannot change - if (currentIndexMetadata.getNumberOfSeedShards() != snapshotIndexMetadata.getNumberOfSeedShards()) { + if (currentIndexMetadata.getSplitShardsMetadata().getNumberOfRootShards() != + snapshotIndexMetadata.getSplitShardsMetadata().getNumberOfRootShards()) { throw new SnapshotRestoreException( snapshot, "cannot restore index [" diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index b3f85b0b1a1f5..71918bc73b55a 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -2610,7 +2610,7 @@ private static Map shards( builder.put(new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 0), ShardSnapshotStatus.MISSING); } else { final IndexRoutingTable indexRoutingTable = routingTable.index(indexName); - for (int i : indexMetadata.getServingShardIds()) { + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { final ShardId shardId = indexRoutingTable.shard(i).shardId(); final String shardRepoGeneration; diff --git a/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java index 1d5f4b92a42cf..be3c93cc9d98f 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java @@ -162,9 +162,10 @@ public void testEqualsIgnoringVersion() { otherRouting.unassignedInfo(), otherRouting.allocationId(), otherRouting.getExpectedShardSize(), - otherRouting.getRecoveringChildShardIds(), - otherRouting.getSplittingShardId() - ); + otherRouting.getShardRange(), + otherRouting.getParentShardId(), + otherRouting.getRecoveringChildShardRanges() + ); break; case 1: // change shard id @@ -178,8 +179,9 @@ public void testEqualsIgnoringVersion() { otherRouting.unassignedInfo(), otherRouting.allocationId(), otherRouting.getExpectedShardSize(), - otherRouting.getRecoveringChildShardIds(), - otherRouting.getSplittingShardId() + otherRouting.getShardRange(), + otherRouting.getParentShardId(), + otherRouting.getRecoveringChildShardRanges() ); break; case 2: @@ -197,8 +199,9 @@ public void testEqualsIgnoringVersion() { otherRouting.unassignedInfo(), otherRouting.allocationId(), otherRouting.getExpectedShardSize(), - otherRouting.getRecoveringChildShardIds(), - otherRouting.getSplittingShardId() + otherRouting.getShardRange(), + otherRouting.getParentShardId(), + otherRouting.getRecoveringChildShardRanges() ); } break; @@ -217,8 +220,9 @@ public void testEqualsIgnoringVersion() { otherRouting.unassignedInfo(), otherRouting.allocationId(), otherRouting.getExpectedShardSize(), - otherRouting.getRecoveringChildShardIds(), - otherRouting.getSplittingShardId() + otherRouting.getShardRange(), + otherRouting.getParentShardId(), + otherRouting.getRecoveringChildShardRanges() ); } break; @@ -242,8 +246,9 @@ public void testEqualsIgnoringVersion() { otherRouting.unassignedInfo(), otherRouting.allocationId(), otherRouting.getExpectedShardSize(), - otherRouting.getRecoveringChildShardIds(), - otherRouting.getSplittingShardId() + otherRouting.getShardRange(), + otherRouting.getParentShardId(), + otherRouting.getRecoveringChildShardRanges() ); } break; diff --git a/server/src/test/java/org/opensearch/index/shard/ShardSplittingQueryTests.java b/server/src/test/java/org/opensearch/index/shard/ShardSplittingQueryTests.java index 9b48fc22408c6..ca4db25ea076d 100644 --- a/server/src/test/java/org/opensearch/index/shard/ShardSplittingQueryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ShardSplittingQueryTests.java @@ -294,7 +294,7 @@ void assertSplit(Directory dir, IndexMetadata metadata, int targetShardId, boole IndexSearcher searcher = new IndexSearcher(reader); searcher.setQueryCache(null); final Weight splitWeight = searcher.createWeight( - searcher.rewrite(new ShardSplittingQuery(metadata, targetShardId, hasNested, (shardId) -> false)), + searcher.rewrite(new ShardSplittingQuery(metadata, targetShardId, hasNested, false)), ScoreMode.COMPLETE_NO_SCORES, 1f ); diff --git a/server/src/test/java/org/opensearch/snapshots/InternalSnapshotsInfoServiceTests.java b/server/src/test/java/org/opensearch/snapshots/InternalSnapshotsInfoServiceTests.java index 160eca003fb39..ad6a8761c66e2 100644 --- a/server/src/test/java/org/opensearch/snapshots/InternalSnapshotsInfoServiceTests.java +++ b/server/src/test/java/org/opensearch/snapshots/InternalSnapshotsInfoServiceTests.java @@ -463,7 +463,7 @@ private ClusterState addUnassignedShards(final ClusterState currentState, String currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY) ); final Map shards = new HashMap<>(); - for (int i : indexMetadata.getServingShardIds()) { + for (int i = 0; i < numberOfShards; i++) { shards.put(new ShardId(index, i), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId())); } diff --git a/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java b/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java index 669b12d03cfcc..182b2c9288a3d 100644 --- a/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java +++ b/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java @@ -305,7 +305,7 @@ public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index, state.nodes(discoBuilder); state.metadata(Metadata.builder().put(indexMetadata, false).generateClusterUuidIfNeeded()); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); - for (int i : indexMetadata.getServingShardIds()) { + for (int i = 0; i < numberOfShards; i++) { final ShardId shardId = new ShardId(index, "_na_", i); IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); indexShardRoutingBuilder.addShard( @@ -351,7 +351,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice .build(); metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); - for (int i : indexMetadata.getServingShardIds()) { + for (int i = 0; i < numberOfShards; i++) { final ShardId shardId = new ShardId(index, "_na_", i); IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); indexShardRoutingBuilder.addShard( diff --git a/test/framework/src/main/java/org/opensearch/cluster/routing/ShardRoutingHelper.java b/test/framework/src/main/java/org/opensearch/cluster/routing/ShardRoutingHelper.java index 098ced6f0bd3a..e09e333b90f25 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/routing/ShardRoutingHelper.java +++ b/test/framework/src/main/java/org/opensearch/cluster/routing/ShardRoutingHelper.java @@ -68,8 +68,9 @@ public static ShardRouting initWithSameId(ShardRouting copy, RecoverySource reco new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, null), copy.allocationId(), copy.getExpectedShardSize(), - copy.getRecoveringChildShardIds(), - copy.getSplittingShardId() + copy.getShardRange(), + copy.getParentShardId(), + copy.getRecoveringChildShardRanges() ); } @@ -88,8 +89,9 @@ public static ShardRouting newWithRestoreSource(ShardRouting routing, RecoverySo routing.unassignedInfo(), routing.allocationId(), routing.getExpectedShardSize(), - routing.getRecoveringChildShardIds(), - routing.getSplittingShardId() + routing.getShardRange(), + routing.getParentShardId(), + routing.getRecoveringChildShardRanges() ); } } diff --git a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java index a44fd0113d7d7..530232060b647 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java @@ -66,6 +66,7 @@ public static ShardRouting newShardRouting(ShardId shardId, String currentNodeId buildAllocationId(state), -1, null, + null, null ); } @@ -88,6 +89,7 @@ public static ShardRouting newShardRouting( buildAllocationId(state), -1, null, + null, null ); } @@ -127,6 +129,7 @@ public static ShardRouting newShardRouting( buildAllocationId(state), -1, null, + null, null ); } @@ -169,6 +172,7 @@ public static ShardRouting newShardRouting( allocationId, -1, null, + null, null ); } @@ -211,6 +215,7 @@ public static ShardRouting newShardRouting( buildAllocationId(state), -1, null, + null, null ); } @@ -235,6 +240,7 @@ public static ShardRouting newShardRouting( buildAllocationId(state), -1, null, + null, null ); }