Skip to content

Commit

Permalink
Child replica recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasvb90 committed Jan 7, 2025
1 parent ee2ccde commit 1575e94
Show file tree
Hide file tree
Showing 58 changed files with 1,611 additions and 708 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.ShardRange;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -55,20 +57,34 @@ private Set<Integer> triggerSplitAndGetChildShardIds(int parentShardId, int numb
return Arrays.stream(shards).map(ShardRange::getShardId).collect(Collectors.toSet());
}

private void waitForSplit(int numberOfSplits, Set<Integer> childShardIds, int parentShardId) throws Exception {
private void waitForSplit(int numberOfSplits, Set<Integer> childShardIds, int parentShardId, int replicaCount) throws Exception {
final long maxWaitTimeMs = Math.max(190 * 1000, 200 * numberOfSplits);

assertBusy(() -> {
ShardStats[] shardStats = client().admin().indices().prepareStats("test").get().getShards();
int startedChildShards = 0;
for (ShardStats shardStat : shardStats) {
if (shardStat.getShardRouting().shardId().id() == parentShardId && shardStat.getShardRouting().started()) {
ShardRouting shardRouting = shardStat.getShardRouting();
if (shardRouting.primary() && shardRouting.shardId().id() == parentShardId && shardStat.getShardRouting().started()) {
throw new Exception("Splitting of shard id " + parentShardId + " failed ");
} else if (childShardIds.contains(shardStat.getShardRouting().shardId().id())) {
startedChildShards ++;
startedChildShards++;
}
}
assertEquals(numberOfSplits, startedChildShards);
if (numberOfSplits + (replicaCount + 1) == startedChildShards) {
System.out.println();
}
assertEquals(numberOfSplits * (replicaCount + 1), startedChildShards);
// ClusterState state = client().admin().cluster().prepareState().get().getState();
// int startedChildReplicas = 0;
// for (RoutingNode routingNode : state.getRoutingNodes()) {
// for (ShardRouting shardRouting : routingNode) {
// if (shardRouting.isStartedChildReplica()) {
// startedChildReplicas++;
// }
// }
// }
// assertEquals(numberOfSplits * (replicaCount), startedChildReplicas);
}, maxWaitTimeMs, TimeUnit.MILLISECONDS);

assertClusterHealth();
Expand Down Expand Up @@ -105,11 +121,11 @@ private void verifyAfterSplit(long totalIndexedDocs, Set<String> ids, int parent
assertEquals(childShardIds, newServingChildShardIds);

refresh("test");
ShardStats[] stats = client().admin().indices().prepareStats("test").get().getShards();
for (ShardStats shardStat : stats) {
logger.info("Shard stat after first indexing of shard " + shardStat.getShardRouting().shardId().id() + " docs: "
+ shardStat.getStats().indexing.getTotal().getIndexCount() + " seq no: " + shardStat.getSeqNoStats().getMaxSeqNo());
}
// ShardStats[] stats = client().admin().indices().prepareStats("test").get().getShards();
// for (ShardStats shardStat : stats) {
// logger.info("Shard stat after first indexing of shard " + shardStat.getShardRouting().shardId().id() + " docs: "
// + shardStat.getStats().indexing.getTotal().getIndexCount() + " seq no: " + shardStat.getSeqNoStats().getMaxSeqNo());
// }

SearchHits hits = client().prepareSearch("test")
.setQuery(matchAllQuery())
Expand All @@ -129,22 +145,23 @@ private void verifyAfterSplit(long totalIndexedDocs, Set<String> ids, int parent

public void testShardSplit() throws Exception {
internalCluster().startNodes(2);
int replicaCount = 2;
prepareCreate("test", Settings.builder().put("index.number_of_shards", 3)
.put("index.number_of_replicas", 0)).get();
.put("index.number_of_replicas", replicaCount)).get();
ensureGreen();
int numDocs = scaledRandomIntBetween(200, 2500);
int numDocs = scaledRandomIntBetween(1500, 2400);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs, 4)) {
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
waitForDocs(numDocs, indexer);
logger.info("--> {} docs indexed", numDocs);
numDocs = scaledRandomIntBetween(200, 1000);
numDocs = scaledRandomIntBetween(5000, 7500);
logger.info("--> Allow indexer to index [{}] more documents", numDocs);
indexer.continueIndexing(numDocs);
int numberOfSplits = 3, parentShardId = 0;
logger.info("--> starting split...");
Set<Integer> childShardIds = triggerSplitAndGetChildShardIds(parentShardId, numberOfSplits);
logger.info("--> waiting for shards to be split ...");
waitForSplit(numberOfSplits, childShardIds, parentShardId);
waitForSplit(numberOfSplits, childShardIds, parentShardId, replicaCount);
logger.info("--> Shard split completed ...");
logger.info("--> Verifying after split ...");
indexer.pauseIndexing();
Expand All @@ -155,8 +172,9 @@ public void testShardSplit() throws Exception {

public void testSplittingShardHavingNonEmptyCommit() throws Exception {
internalCluster().startNodes(2);
int replicaCount = 0;
prepareCreate("test", Settings.builder().put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)).get();
.put("index.number_of_replicas", replicaCount)).get();
ensureGreen();
int numDocs = scaledRandomIntBetween(200, 2500);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs, 4)) {
Expand All @@ -176,7 +194,7 @@ public void testSplittingShardHavingNonEmptyCommit() throws Exception {
logger.info("--> starting split...");
Set<Integer> childShardIds = triggerSplitAndGetChildShardIds(parentShardId, numberOfSplits);
logger.info("--> waiting for shards to be split ...");
waitForSplit(numberOfSplits, childShardIds, parentShardId);
waitForSplit(numberOfSplits, childShardIds, parentShardId, replicaCount);
logger.info("--> Shard split completed ...");
logger.info("--> Verifying after split ...");
indexer.pauseIndexing();
Expand All @@ -187,8 +205,9 @@ public void testSplittingShardHavingNonEmptyCommit() throws Exception {

public void testSplittingShardWithNoTranslogReplay() throws Exception {
internalCluster().startNodes(2);
int replicaCount = 0;
prepareCreate("test", Settings.builder().put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)).get();
.put("index.number_of_replicas", replicaCount)).get();
ensureGreen();
int numDocs = scaledRandomIntBetween(200, 2500);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs, 4)) {
Expand All @@ -205,7 +224,7 @@ public void testSplittingShardWithNoTranslogReplay() throws Exception {
logger.info("--> starting split...");
Set<Integer> childShardIds = triggerSplitAndGetChildShardIds(parentShardId, numberOfSplits);
logger.info("--> waiting for shards to be split ...");
waitForSplit(numberOfSplits, childShardIds, parentShardId);
waitForSplit(numberOfSplits, childShardIds, parentShardId, replicaCount);
logger.info("--> Shard split completed ...");
logger.info("--> Verifying after split ...");
verifyAfterSplit(indexer.totalIndexedDocs(), indexer.getIds(), parentShardId, childShardIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ public PitAwareShardRouting(
allocationId,
expectedShardSize,
null,
null,
null
);
this.pitId = pitId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,9 @@ static boolean executeBulkItemRequest(
request.isRetry()
);
}
// if (primary.isPrimaryMode() && primary.shardId().id() == 0) {
// logger.info("Indexing operation sequence " + result.getSeqNo() + " on shard 0.");
// }
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
// if (context.getBulkShardRequest().shardId().id() == 0) {
// logger.info("Executing bulk item mapping update");
Expand Down Expand Up @@ -873,14 +876,21 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
final BulkItemResponse response = item.getPrimaryResponse();
final Engine.Result operationResult;
boolean discardOperation = false;
if (replica.routingEntry().isSplitTarget() == true) {
if (replica.getParentShardId() != null) {
IndexMetadata indexMetadata = replica.indexSettings().getIndexMetadata();
// Discard operations belonging to a different child shard. This can happen during in-place shard
// split recovery where after all child shards are added to replication tracker, bulk
// operations are replicated to all child primaries.
int computedShardId = OperationRouting.generateShardId(indexMetadata, item.request().id(),
item.request().routing(), true);
discardOperation = computedShardId != replica.shardId().id();
// if (replica.routingEntry().isStartedChildReplica()) {
// logger.info("Processing seq no." + response.getResponse().getSeqNo() + " on replica child "
// + replica.shardId().id() + ", discarding " + discardOperation);
// } else if (replica.routingEntry().isSplitTarget()) {
// logger.info("Processing seq no. on child primary" + response.getResponse().getSeqNo() + " on replica child "
// + replica.shardId().id() + ", discarding " + discardOperation);
// }
}

if (item.getPrimaryResponse().isFailed()) {
Expand Down Expand Up @@ -909,7 +919,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
operationResult = replica.markSeqNoAsNoop(
response.getResponse().getSeqNo(),
response.getResponse().getPrimaryTerm(),
"op belongs to another child shard"
Translog.NoOp.FILLING_GAPS
);
} else {
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,6 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
if (TransportShardBulkAction.debugRequest.get() && request.shardId().id() == 0) {
logger.info("Handling primary result for id: " + id);
}
if (request.shardId.id() == 3) {
System.out.println();
}
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
Expand Down Expand Up @@ -391,12 +388,6 @@ public boolean shouldRetry(Exception e) {

private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointSupplier, LongSupplier globalCheckpointSupplier) {
try {
if (TransportShardBulkAction.debugRequest.get() == true) {
logger.info("Updating checkpoint for shard " + shard.shardId().id()
+ " on primary shard allocation " + primary.routingEntry().allocationId().getId()
+ " for replica shard allocation " + shard.allocationId().getId()
+ " with checkpoint " + localCheckpointSupplier.getAsLong());
}
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), localCheckpointSupplier.getAsLong());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), globalCheckpointSupplier.getAsLong());
} catch (final AlreadyClosedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ public void failShardIfNeeded(
// If a write action fails due to the closure of the primary shard
// then the replicas should not be marked as failed since they are
// still up-to-date with the (now closed) primary shard
if (exception instanceof PrimaryShardClosedException == false && replica.isSplitTarget() == false) {
if (exception instanceof PrimaryShardClosedException == false) {
shardStateAction.remoteShardFailed(
replica.shardId(),
replica.allocationId().getId(),
Expand Down
Loading

0 comments on commit 1575e94

Please sign in to comment.