Skip to content

Commit

Permalink
Saved local changes
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasvb90 committed Jul 31, 2024
1 parent a15b540 commit 621f6c2
Show file tree
Hide file tree
Showing 16 changed files with 161 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public Set<String> getSupportedOptions() {

@Override
public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment round) {
if (true) {
return false;
}
processingEnv.getMessager().printMessage(Kind.NOTE, "Processing OpenSearch Api annotations");

if (processingEnv.getOptions().containsKey(OPTION_CONTINUE_ON_FAILING_CHECKS) == true) {
Expand Down Expand Up @@ -106,6 +109,10 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
* @param enclosing enclosing element
*/
private void process(ExecutableElement executable, Element enclosing) {
if (true) {
return;
}

if (!inspectable(executable)) {
return;
}
Expand Down Expand Up @@ -174,6 +181,9 @@ private void process(ExecutableElement executable, WildcardType type) {
* @param ref reference type
*/
private void process(ExecutableElement executable, ReferenceType ref) {
if (true) {
return;
}
// The element has been processed already
if (processed.add(ref) == false) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,18 @@
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.split.InPlaceShardSplitRequest;
import org.opensearch.action.admin.indices.split.InPlaceShardSplitResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.search.SearchHits;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -78,7 +74,7 @@ private void waitForSplit(int numberOfSplits, Set<Integer> childShardIds, int pa
System.out.println("Shard split completed");
}

private void verifyAfterSplit(BackgroundIndexer indexer, int parentShardId, Set<Integer> childShardIds) throws InterruptedException {
private void verifyAfterSplit(long totalIndexedDocs, Set<String> ids, int parentShardId, Set<Integer> childShardIds) throws InterruptedException {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
IndexMetadata indexMetadata = clusterState.metadata().index("test");
assertTrue(indexMetadata.isParentShard(parentShardId));
Expand All @@ -90,18 +86,22 @@ private void verifyAfterSplit(BackgroundIndexer indexer, int parentShardId, Set<
}
assertEquals(childShardIds, newServingChildShardIds);

indexer.pauseIndexing();
indexer.stopAndAwaitStopped();
refresh("test");
ShardStats[] stats = client().admin().indices().prepareStats("test").get().getShards();
for (ShardStats shardStat : stats) {
logger.info("Shard stat after first indexing of shard " + shardStat.getShardRouting().shardId().id() + " docs: "
+ shardStat.getStats().indexing.getTotal().getIndexCount() + " seq no: " + shardStat.getSeqNoStats().getMaxSeqNo());
}

SearchHits hits = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setSize((int) indexer.totalIndexedDocs())
.setSize((int) totalIndexedDocs)
.storedFields()
.execute()
.actionGet()
.getHits();
assertThat(hits.getTotalHits().value, equalTo(indexer.totalIndexedDocs()));
for (String id : indexer.getIds()) {
assertThat(hits.getTotalHits().value, equalTo(totalIndexedDocs));
for (String id : ids) {
// Make sure there is no duplicate doc.
assertHitCount(client().prepareSearch("test").setSize(0)
.setQuery(matchQuery("_id", id)).get(), 1);
Expand All @@ -119,7 +119,6 @@ public void testShardSplit() throws Exception {
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
waitForDocs(numDocs, indexer);
logger.info("--> {} docs indexed", numDocs);

numDocs = scaledRandomIntBetween(200, 1000);
logger.info("--> Allow indexer to index [{}] more documents", numDocs);
indexer.continueIndexing(numDocs);
Expand All @@ -130,7 +129,9 @@ public void testShardSplit() throws Exception {
waitForSplit(numberOfSplits, childShardIds, parentShardId);
logger.info("--> Shard split completed ...");
logger.info("--> Verifying after split ...");
verifyAfterSplit(indexer, parentShardId, childShardIds);
indexer.pauseIndexing();
indexer.stopAndAwaitStopped();
verifyAfterSplit(indexer.totalIndexedDocs(), indexer.getIds(), parentShardId, childShardIds);
}
}

Expand Down Expand Up @@ -160,7 +161,36 @@ public void testSplittingShardHavingNonEmptyCommit() throws Exception {
waitForSplit(numberOfSplits, childShardIds, parentShardId);
logger.info("--> Shard split completed ...");
logger.info("--> Verifying after split ...");
verifyAfterSplit(indexer, parentShardId, childShardIds);
indexer.pauseIndexing();
indexer.stopAndAwaitStopped();
verifyAfterSplit(indexer.totalIndexedDocs(), indexer.getIds(), parentShardId, childShardIds);
}
}

public void testSplittingShardWithNoTranslogReplay() throws Exception {
internalCluster().startNodes(2);
prepareCreate("test", Settings.builder().put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)).get();
ensureGreen();
int numDocs = scaledRandomIntBetween(200, 2500);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) {
indexer.setIgnoreIndexingFailures(false);
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
waitForDocs(numDocs, indexer);
logger.info("--> {} docs indexed", numDocs);
indexer.stopAndAwaitStopped();
flushAndRefresh("test");
ShardStats shardStat = client().admin().indices().prepareStats("test").get().getShards()[0];
assertEquals(numDocs, shardStat.getCommitStats().getNumDocs());

int numberOfSplits = 3, parentShardId = 0;
logger.info("--> starting split...");
Set<Integer> childShardIds = triggerSplitAndGetChildShardIds(parentShardId, numberOfSplits);
logger.info("--> waiting for shards to be split ...");
waitForSplit(numberOfSplits, childShardIds, parentShardId);
logger.info("--> Shard split completed ...");
logger.info("--> Verifying after split ...");
verifyAfterSplit(indexer.totalIndexedDocs(), indexer.getIds(), parentShardId, childShardIds);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,20 +614,19 @@ protected void doRun() {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
ShardId shardId = clusterService.operationRouting()
.indexShards(clusterState, concreteIndex, request.id(), request.routing())
.shardId();
ShardId shardId;
if (reDriveOnChildShards) {
shardId = clusterService.operationRouting().shardWithRecoveringChild(clusterState,
concreteIndex, request.id(), request.routing(), shardId.getIndex());
concreteIndex, request.id(), request.routing(), clusterState.routingTable().index(concreteIndex).getIndex());
} else {
shardId = clusterService.operationRouting()
.indexShards(clusterState, concreteIndex, request.id(), request.routing())
.shardId();
}

List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}
if (reDriveOnChildShards) {
logger.info("Redriving requests on child shards, count " + bulkRequest.requests.size());
}

if (requestsByShard.isEmpty()) {
BulkItemResponse[] response = responses.toArray(new BulkItemResponse[responses.length()]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,6 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
}
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
if (discardOperation) {
logger.info("Discarding operation " + response.getResponse().getSeqNo());
operationResult = replica.markSeqNoAsNoop(
response.getResponse().getSeqNo(),
response.getResponse().getPrimaryTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,6 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
private final Map<Integer, SplitMetadata> parentToChildShardsMetadata;
private final int[] servingShardIds;
private final int numberOfNonServingShards;
private final Map<Integer, Integer> childToParentShardIds;

private IndexMetadata(
final Index index,
Expand Down Expand Up @@ -719,8 +718,7 @@ private IndexMetadata(
final boolean isSystem,
final Map<Integer, SplitMetadata> parentToChildShardsMetadata,
final int[] servingShardIds,
final int numberOfNonServingShards,
final Map<Integer, Integer> childToParentShardIds
final int numberOfNonServingShards
) {

this.index = index;
Expand Down Expand Up @@ -757,7 +755,6 @@ private IndexMetadata(
this.parentToChildShardsMetadata = Collections.unmodifiableMap(parentToChildShardsMetadata);
this.servingShardIds = servingShardIds;
this.numberOfNonServingShards = numberOfNonServingShards;
this.childToParentShardIds = childToParentShardIds;
assert numberOfSeedShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfSeedShards;
}

Expand Down Expand Up @@ -848,10 +845,6 @@ public int getNumOfNonServingShards() {
return numberOfNonServingShards;
}

public Integer getParentShardIdOrNull(int shardId) {
return childToParentShardIds.get(shardId);
}

public int[] getServingShardIds() {
return servingShardIds.clone();
}
Expand Down Expand Up @@ -1467,8 +1460,13 @@ private Builder numberOfNonServingShards(int numberOfNonServingShards) {
}

public Builder updateMetadataForNewChildShards(Map<Integer, String> newAllocationIds, int sourceShardId) {
if (parentToChildShardsMetadata.get(sourceShardId) == null) {
// parentToChildShardsMetadata will always have this parent at this point but if we somehow reached here
// due to a missed fail shard event and shard split went through, then we will need to re-populate this
// as now shard is already split.
addChildShardsForSplittingShard(sourceShardId, new ArrayList<>(newAllocationIds.keySet()));
}
int numberOfNewShards = newAllocationIds.size();

int numberOfServingShards = this.servingShardIds.length + numberOfNewShards - 1;
int[] newServingShards = new int[numberOfServingShards];
int newIdx = 0;
Expand Down Expand Up @@ -1506,6 +1504,34 @@ public Builder updateMetadataForNewChildShards(Map<Integer, String> newAllocatio
return this;
}

public Builder addChildShardsForSplittingShard(int sourceShardId, List<Integer> childShardIds) {
Integer parentOfSource = null;
for (Integer parent : parentToChildShardsMetadata.keySet()) {
SplitMetadata splitMetadata = parentToChildShardsMetadata.get(parent);
for (Integer childShard : splitMetadata.getChildShards()) {
if (sourceShardId == childShard) {
parentOfSource = parent;
break;
}
}
if (parentOfSource != null) {
break;
}
}

int parentRoutingFactor = parentOfSource == null ? routingNumShards / INDEX_NUMBER_OF_SHARDS_SETTING.get(settings):
parentToChildShardsMetadata.get(parentOfSource).getRoutingFactor();

SplitMetadata splitMetadata = new SplitMetadata(sourceShardId, childShardIds, parentRoutingFactor);
putParentToChildShardMetadata(splitMetadata);
return this;
}

public Builder removeParentToChildShardMetadata(Integer parentShardId) {
parentToChildShardsMetadata.remove(parentShardId);
return this;
}

public Builder numberOfReplicas(int numberOfReplicas) {
settings = Settings.builder().put(settings).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build();
return this;
Expand Down Expand Up @@ -1605,11 +1631,6 @@ public Builder putParentToChildShardMetadata(SplitMetadata splitMetadata) {
return this;
}

public Builder removeParentToChildShardMetadata(Integer parentShardId) {
parentToChildShardsMetadata.remove(parentShardId);
return this;
}

public long version() {
return this.version;
}
Expand Down Expand Up @@ -1709,12 +1730,6 @@ public IndexMetadata build() {
servingShardIds[i] = i;
}
}
Map<Integer, Integer> childToParentShardIds = new HashMap<>();
parentToChildShardsMetadata.forEach((parentShardId, splitMetadata) -> {
splitMetadata.getChildShards().forEach(childShard -> {
childToParentShardIds.put(childShard, parentShardId);
});
});

if (INDEX_NUMBER_OF_REPLICAS_SETTING.exists(settings) == false) {
throw new IllegalArgumentException("must specify number of replicas for index [" + index + "]");
Expand Down Expand Up @@ -1831,8 +1846,7 @@ public IndexMetadata build() {
isSystem,
parentToChildShardsMetadata,
servingShardIds,
numberOfNonServingShards,
childToParentShardIds
numberOfNonServingShards
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,7 @@ public ClusterState applyShardSplitRequest(
childShardIds.add(maxUsedShardId + i);
}

Integer parentShardId = curIndexMetadata.getParentShardIdOrNull(sourceShardId.id());
int parentRoutingFactor = parentShardId == null ? curIndexMetadata.getRoutingFactor() :
curIndexMetadata.getSplitMetadata(parentShardId).getRoutingFactor();

SplitMetadata splitMetadata = new SplitMetadata(sourceShardId.id(), childShardIds, parentRoutingFactor);

indexMetadataBuilder.putParentToChildShardMetadata(splitMetadata);
indexMetadataBuilder.addChildShardsForSplittingShard(sourceShardId.id(), childShardIds);
RoutingTable routingTable = routingTableBuilder.build();
metadataBuilder.put(indexMetadataBuilder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ private void updateSplitSourceOutgoingRecovery(ShardRouting splitSource, final b
}

private void updateRecoveryCounts(final ShardRouting routing, final boolean increment, @Nullable final ShardRouting primary) {

final int howMany = increment ? 1 : -1;
assert routing.initializing() : "routing must be initializing: " + routing;
// TODO: check primary == null || primary.active() after all tests properly add ReplicaAfterPrimaryActiveAllocationDecider
Expand Down Expand Up @@ -614,47 +613,49 @@ public Tuple<ShardRouting, List<ShardRouting>> splitShard(

public void startInPlaceChildShards(
Logger logger,
List<ShardRouting> inPlaceChildShards,
List<ShardRouting> childShards,
IndexMetadata indexMetadata,
RoutingChangesObserver routingChangesObserver
) {
ensureMutable();
assert !inPlaceChildShards.isEmpty();
ShardRouting sourceShard = getByAllocationId(
inPlaceChildShards.get(0).getSplittingShardId(),
inPlaceChildShards.get(0).allocationId().getParentAllocationId()
assert !childShards.isEmpty();
ShardRouting parentShard = getByAllocationId(
childShards.get(0).getSplittingShardId(),
childShards.get(0).allocationId().getParentAllocationId()
);
int validShardEvents = 0, invalidShardEvents = 0;
for (ShardRouting childShardStartedEvent : inPlaceChildShards) {
if (childShardStartedEvent.isSplitTargetOf(sourceShard) == false) {
for (ShardRouting childShard : childShards) {
if (childShard.isSplitTargetOf(parentShard) == false) {
invalidShardEvents++;
} else {
validShardEvents++;
}
}

if (invalidShardEvents != 0 || validShardEvents != sourceShard.getRecoveringChildShards().length) {
if (invalidShardEvents != 0 || validShardEvents != parentShard.getRecoveringChildShards().length) {
logger.error(
"Invalid shard started event for child shards received. Unknown child found :["
+ (invalidShardEvents != 0)
+ "], Number of missing child shards in started shard event: ["
+ (sourceShard.getRecoveringChildShards().length - validShardEvents)
+ (parentShard.getRecoveringChildShards().length - validShardEvents)
+ "], Parent shard is valid: ["
+ (indexMetadata.isParentShard(parentShard.shardId().id()) == true)
+ "]. Failing all child shards and cancelling relocation."
);
// We just need to fail one child shard because failShard ensures that failure of any child shard
// fails all child shards and cancels relocation of source.
// fails all child shards and cancels split of source shard.
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "");
failShard(logger, inPlaceChildShards.get(0), unassignedInfo, indexMetadata, routingChangesObserver);
failShard(logger, childShards.get(0), unassignedInfo, indexMetadata, routingChangesObserver);
return;
}

inPlaceChildShards.forEach(childShard -> {
childShards.forEach(childShard -> {
ShardRouting startedShard = started(childShard);
logger.trace("{} marked shard as started (routing: {})", childShard.shardId(), childShard);
routingChangesObserver.shardStarted(childShard, startedShard);
});
remove(sourceShard);
routingChangesObserver.splitCompleted(sourceShard, indexMetadata);
remove(parentShard);
routingChangesObserver.splitCompleted(parentShard, indexMetadata);
}

/**
Expand Down
Loading

0 comments on commit 621f6c2

Please sign in to comment.