Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Prabhat Sharma committed Apr 22, 2024
1 parent ddb0c79 commit cb45aa2
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
private final ActiveShardCount waitForActiveShards;
private final Map<String, RolloverInfo> rolloverInfos;
private final boolean isSystem;
private final Map<Integer, Set<Integer>> parentShards;
private final Map<Integer, SplitPartition> parentShards;

private IndexMetadata(
final Index index,
Expand Down Expand Up @@ -710,7 +710,7 @@ private IndexMetadata(
final ActiveShardCount waitForActiveShards,
final Map<String, RolloverInfo> rolloverInfos,
final boolean isSystem,
final Map<Integer, Set<Integer>> parentShards
final Map<Integer, SplitPartition> parentShards
) {

this.index = index;
Expand Down Expand Up @@ -899,9 +899,25 @@ public boolean isParentShard(ShardId shardId) {

public List<Integer> getChildShards(ShardId shardId) {
assert isParentShard(shardId);
return new ArrayList<>(parentShards.get(shardId.getId()));
return new ArrayList<>(parentShards.get(shardId.getId()).getChildShards());
}

public SplitPartition getParentShard(ShardId shardId) {
if(isParentShard(shardId))
return parentShards.get(shardId.getId());
return new SplitPartition(-1, Set.of(), routingFactor, routingNumShards);
}

public SplitPartition getPartition(Integer shardId) {
assert(isSplitPartition(shardId));
return parentShards.get(shardId);
}

public Boolean isSplitPartition(Integer shardId) {
if (parentShards.containsKey(shardId))
return parentShards.get(shardId).getChildShards().size() > 0;
return false;
}
public Set<String> inSyncAllocationIds(int shardId) {
assert shardId >= 0 && shardId < numberOfShards;
return inSyncAllocationIds.get(shardId);
Expand Down Expand Up @@ -1044,7 +1060,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
private final Diff<Map<Integer, Set<String>>> inSyncAllocationIds;
private final Diff<Map<String, RolloverInfo>> rolloverInfos;
private final boolean isSystem;
private final Diff<Map<Integer, Set<Integer>>> parentShardIds;
private final Diff<Map<Integer, SplitPartition>> parentShardIds;

IndexMetadataDiff(IndexMetadata before, IndexMetadata after) {
index = after.index.getName();
Expand All @@ -1070,8 +1086,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
parentShardIds = DiffableUtils.diff(
before.parentShards,
after.parentShards,
DiffableUtils.getVIntKeySerializer(),
DiffableUtils.IntegerSetValueSerializer.getInstance()
DiffableUtils.getVIntKeySerializer()
);
}

Expand All @@ -1083,6 +1098,8 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
new DiffableUtils.DiffableValueReader<>(DiffableStringMap::readFrom, DiffableStringMap::readDiffFrom);
private static final DiffableUtils.DiffableValueReader<String, RolloverInfo> ROLLOVER_INFO_DIFF_VALUE_READER =
new DiffableUtils.DiffableValueReader<>(RolloverInfo::new, RolloverInfo::readDiffFrom);
private static final DiffableUtils.DiffableValueReader<Integer, SplitPartition> SPLIT_PARTITION_DIFF_VALUE_READER =
new DiffableUtils.DiffableValueReader<>(SplitPartition::new, SplitPartition::readDiffFrom);

IndexMetadataDiff(StreamInput in) throws IOException {
index = in.readString();
Expand All @@ -1107,7 +1124,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
parentShardIds = DiffableUtils.readJdkMapDiff(
in,
DiffableUtils.getVIntKeySerializer(),
DiffableUtils.IntegerSetValueSerializer.getInstance()
SPLIT_PARTITION_DIFF_VALUE_READER
);
}

Expand Down Expand Up @@ -1192,10 +1209,8 @@ public static IndexMetadata readFrom(StreamInput in) throws IOException {
builder.system(in.readBoolean());
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
int parentShardsSize = in.readVInt();
for (int i = 0; i < parentShardsSize; i++) {
int parentShardId = in.readVInt();
Set<Integer> childShards = DiffableUtils.IntegerSetValueSerializer.getInstance().read(in, parentShardId);
builder.putParentShard(parentShardId, childShards);
for(int i=0; i < parentShardsSize; i++) {
builder.putParentShard(new SplitPartition(in));
}
}
return builder.build();
Expand Down Expand Up @@ -1237,9 +1252,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isSystem);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVInt(parentShards.size());
for (final Map.Entry<Integer, Set<Integer>> cursor : parentShards.entrySet()) {
out.writeVInt(cursor.getKey());
DiffableUtils.IntegerSetValueSerializer.getInstance().write(cursor.getValue(), out);
for(final SplitPartition partition : parentShards.values()) {
partition.writeTo(out);
}
} else if (parentShards.isEmpty() == false) {
throw new IllegalStateException("In-place split not allowed on older versions.");
Expand Down Expand Up @@ -1281,7 +1295,7 @@ public static class Builder {
private final Map<String, RolloverInfo> rolloverInfos;
private Integer routingNumShards;
private boolean isSystem;
private final Map<Integer, Set<Integer>> parentShards;
private final Map<Integer, SplitPartition> parentShards;

public Builder(String index) {
this.index = index;
Expand Down Expand Up @@ -1446,11 +1460,19 @@ public Builder putRolloverInfo(RolloverInfo rolloverInfo) {
return this;
}

public Builder putParentShard(Integer shard, Set<Integer> childShards) {
parentShards.put(shard, childShards);
public Builder putParentShard(SplitPartition parentPartition) {
parentShards.put(parentPartition.getParentShardId(), parentPartition);
return this;
}

public Builder initializeSplitShard(Integer shardId, SplitPartition parentPartition, Integer numSplits) {
Set<Integer> childShardIds = new HashSet<>();
for (int i = 0; i < numSplits; i++) {
childShardIds.add(shardId + i);
}
putParentShard(new SplitPartition(shardId, parentPartition, childShardIds));
return this;
}
public long version() {
return this.version;
}
Expand Down Expand Up @@ -1761,12 +1783,8 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
builder.endObject();

builder.startObject(KEY_PARENT_SHARDS);
for (final Map.Entry<Integer, Set<Integer>> cursor : indexMetadata.parentShards.entrySet()) {
builder.startArray(String.valueOf(cursor.getKey()));
for (Integer childShard : cursor.getValue()) {
builder.value(childShard);
}
builder.endArray();
for (final SplitPartition cursor : indexMetadata.parentShards.values()) {
cursor.toXContent(builder, params);
}
builder.endObject();

Expand Down Expand Up @@ -1857,16 +1875,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
Set<Integer> childShards = new HashSet<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_NUMBER) {
childShards.add(parser.intValue());
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
}
builder.putParentShard(Integer.parseInt(currentFieldName), childShards);
} else if (token == XContentParser.Token.START_OBJECT) {
builder.putParentShard(SplitPartition.parse(parser, currentFieldName));
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,7 @@ public static int calculateNumRoutingShards(int numShards, Version indexVersionC
//
// We use as a default number of routing shards the higher number that can be expressed
// as {@code numShards * 2^x`} that is less than or equal to the maximum number of shards: 1024.
int log2MaxNumShards = 10; // logBase2(1024)
int log2MaxNumShards = 16; // logBase2(1024)
int log2NumShards = 32 - Integer.numberOfLeadingZeros(numShards - 1); // ceil(logBase2(numShards))
int numSplits = log2MaxNumShards - log2NumShards;
numSplits = Math.max(1, numSplits); // Ensure the index can be split at least once
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;

import java.util.HashSet;
import java.util.Set;
import java.util.function.BiFunction;

public class MetadataInPlaceShardSplitService {
Expand Down Expand Up @@ -135,15 +133,9 @@ public ClusterState applyShardSplitRequest(
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(curIndexMetadata);
int newShardId = curIndexMetadata.getNumberOfShards();
Set<Integer> childShardIds = new HashSet<>();
for (int shardId = newShardId; shardId < newShardId + request.getSplitInto(); shardId++) {
childShardIds.add(shardId);
}
indexMetadataBuilder.putParentShard(sourceShardId.id(), childShardIds);
indexMetadataBuilder.initializeSplitShard(curIndexMetadata.getNumberOfShards(),curIndexMetadata.getParentShard(sourceShardId), request.getSplitInto());
RoutingTable routingTable = routingTableBuilder.build();
metadataBuilder.put(indexMetadataBuilder);

ClusterState updatedState = ClusterState.builder(currentState).metadata(metadataBuilder).routingTable(routingTable).build();
return rerouteRoutingTable.apply(updatedState, "shard [" + request.getShardId() + "] of index [" + request.getIndex() + "] split");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;

public class SplitPartition extends AbstractDiffable<SplitPartition> {

private final Integer parentShardId;
private final Set<Integer> childShardIds;

private final int routingFactor;
private final int routingShard;

public SplitPartition(Integer parentShardId, SplitPartition parentPartition, Set<Integer> childShardIds) {
this.parentShardId = parentShardId;
this.childShardIds = childShardIds;
int numChildShards = childShardIds.size();
this.routingShard = calculateNumRoutingShards(parentPartition.routingFactor, numChildShards);
this.routingFactor = this.routingShard / numChildShards;
}

public SplitPartition(Integer parentShardId, Set<Integer> childShardIds, Integer routingFactor, Integer routingShard) {
this.parentShardId = parentShardId;
this.childShardIds = childShardIds;
this.routingFactor = routingFactor;
this.routingShard = routingShard;
}

public SplitPartition(StreamInput in) throws IOException {
parentShardId = in.readVInt();
childShardIds = in.readSet(StreamInput::readVInt);
routingFactor = in.readVInt();
routingShard = in.readVInt();
}

/**
* Calculate the number of routing shards for a given parent shard
* @param parentRoutingFactor
* @param numChildShards
* @return the number of routing shards
* @throws IllegalArgumentException if the number of routing shards is less than 1 i.e. parent shard cannot be split further
*
* The number of routing shards is calculated as follows:
* newRoutingFactor = 2 ^ floor(log2(parentRoutingFactor / numChildShards))
* numRoutingShards = numChildShards * newRoutingFactor
*/
public static int calculateNumRoutingShards(int parentRoutingFactor, int numChildShards) {
int numRoutingShardsPerChildShard = parentRoutingFactor / numChildShards;
int log2numRoutingShardsPerChildShard = 32 - Integer.numberOfLeadingZeros(numRoutingShardsPerChildShard - 1); // ceil(log2(x)) = 32 - leadingZeros(x - 1)

int numAllowedSplits = (numRoutingShardsPerChildShard & (numRoutingShardsPerChildShard - 1)) == 0 // floor(log2(x)) = ceil(log2(x)) - 1 if x is a power of 2
? log2numRoutingShardsPerChildShard : log2numRoutingShardsPerChildShard - 1;
if(numAllowedSplits <= 0)
throw new IllegalArgumentException("Cannot split further");
return numChildShards * (1 << numAllowedSplits);
}

public int getRoutingFactor() {
return routingFactor;
}

public int getRoutingShard() {
return routingShard;
}

public ArrayList<Integer> getChildShards() {
return new ArrayList<>(childShardIds);
}

public Integer getParentShardId() {
return parentShardId;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(parentShardId);
out.writeCollection(childShardIds, StreamOutput::writeVInt);
out.writeVInt(routingFactor);
out.writeVInt(routingShard);
}

public static Diff<SplitPartition> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(SplitPartition::new, in);
}

public int hashCode() {
int result = childShardIds.hashCode();
result = 31 * result + parentShardId;
result = 31 * result + routingFactor;
result = 31 * result + routingShard;
return result;
}

public void toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(parentShardId.toString());
builder.startArray("child_shard_ids");
for (final Integer childShard : childShardIds) {

builder.value(childShard);
}
builder.endArray();
builder.field("routing_factor", routingFactor);
builder.field("routing_shard", routingShard);
builder.endObject();
}

public static SplitPartition parse(XContentParser parser, String currentFieldName) throws IOException {
Integer parentShardId = Integer.parseInt(currentFieldName);
Set<Integer> childShardIds = new HashSet<>();
int routingFactor = 0;
int routingShard = 0;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
if (fieldName.equals("child_shard_ids")) {
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
childShardIds.add(parser.intValue());
}
}
} else if (fieldName.equals("routing_factor")) {
routingFactor = parser.intValue();
} else if (fieldName.equals("routing_shard")) {
routingShard = parser.intValue();
}
}
}
return new SplitPartition(parentShardId, childShardIds, routingFactor, routingShard);
}

public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

SplitPartition that = (SplitPartition) o;

if (routingFactor != that.routingFactor) {
return false;
}
if (routingShard != that.routingShard) {
return false;
}
return childShardIds.equals(that.childShardIds);
}
}
Loading

0 comments on commit cb45aa2

Please sign in to comment.