Skip to content

Commit

Permalink
Child recovery failure handling
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasvb90 committed May 5, 2024
1 parent fdb6b83 commit 0c51aed
Show file tree
Hide file tree
Showing 16 changed files with 241 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public void remoteShardFailed(
) {
assert primaryTerm > 0L : "primary term should be strictly positive";
remoteFailedShardsDeduplicator.executeOnce(
new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale),
new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale, null),
listener,
(req, reqListener) -> sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), req, reqListener)
);
Expand Down Expand Up @@ -288,14 +288,29 @@ public void localShardFailed(
@Nullable final Exception failure,
ActionListener<Void> listener,
final ClusterState currentState
) {
localShardFailed(shardRouting, message, failure, listener, currentState, null);
}

/**
* Send a shard failed request to the cluster-manager node to update the cluster state when a shard on the local node failed.
*/
public void localShardFailed(
final ShardRouting shardRouting,
final String message,
@Nullable final Exception failure,
ActionListener<Void> listener,
final ClusterState currentState,
final Boolean childShardsFailedEvent
) {
FailedShardEntry shardEntry = new FailedShardEntry(
shardRouting.shardId(),
shardRouting.allocationId().getId(),
0L,
message,
failure,
true
true,
childShardsFailedEvent
);
sendShardAction(SHARD_FAILED_ACTION_NAME, currentState, shardEntry, listener);
}
Expand Down Expand Up @@ -452,6 +467,7 @@ public ClusterTasksResult<FailedShardEntry> execute(ClusterState currentState, L
List<FailedShardEntry> tasksToBeApplied = new ArrayList<>();
List<FailedShard> failedShardsToBeApplied = new ArrayList<>();
List<StaleShard> staleShardsToBeApplied = new ArrayList<>();
int numberOfFailedChildShards = 0;

for (FailedShardEntry task : tasks) {
IndexMetadata indexMetadata = currentState.metadata().index(task.shardId.getIndex());
Expand Down Expand Up @@ -514,6 +530,13 @@ public ClusterTasksResult<FailedShardEntry> execute(ClusterState currentState, L
logger.debug("{} ignoring shard failed task [{}] (shard does not exist anymore)", task.shardId, task);
batchResultBuilder.success(task);
}
} else if (Boolean.TRUE.equals(task.childShardsFailedEvent)) {
logger.debug("{} failing child shards {} (shard failed task: [{}])", task.shardId, matched, task);
tasksToBeApplied.add(task);
for (ShardRouting childShard : matched.getRecoveringChildShards()) {
numberOfFailedChildShards++;
failedShardsToBeApplied.add(new FailedShard(childShard, task.message, task.failure, task.markAsStale));
}
} else {
// failing a shard also possibly marks it as stale (see IndexMetadataUpdater)
logger.debug("{} failing shard {} (shard failed task: [{}])", task.shardId, matched, task);
Expand All @@ -522,7 +545,8 @@ public ClusterTasksResult<FailedShardEntry> execute(ClusterState currentState, L
}
}
}
assert tasksToBeApplied.size() == failedShardsToBeApplied.size() + staleShardsToBeApplied.size();
assert tasksToBeApplied.size() == failedShardsToBeApplied.size() + staleShardsToBeApplied.size()
- (numberOfFailedChildShards > 0 ? numberOfFailedChildShards - 1 : 0);

ClusterState maybeUpdatedState = currentState;
try {
Expand Down Expand Up @@ -578,6 +602,10 @@ public static class FailedShardEntry extends TransportRequest {
final Exception failure;
final boolean markAsStale;

@Nullable
final Boolean childShardsFailedEvent;


FailedShardEntry(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
Expand All @@ -586,6 +614,11 @@ public static class FailedShardEntry extends TransportRequest {
message = in.readString();
failure = in.readException();
markAsStale = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
childShardsFailedEvent = in.readOptionalBoolean();
} else {
childShardsFailedEvent = null;
}
}

public FailedShardEntry(
Expand All @@ -594,14 +627,16 @@ public FailedShardEntry(
long primaryTerm,
String message,
@Nullable Exception failure,
boolean markAsStale
boolean markAsStale,
Boolean childShardsFailedEvent
) {
this.shardId = shardId;
this.allocationId = allocationId;
this.primaryTerm = primaryTerm;
this.message = message;
this.failure = failure;
this.markAsStale = markAsStale;
this.childShardsFailedEvent = childShardsFailedEvent;
}

public ShardId getShardId() {
Expand All @@ -621,6 +656,16 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(message);
out.writeException(failure);
out.writeBoolean(markAsStale);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalBoolean(childShardsFailedEvent);
} else {
if (childShardsFailedEvent != null) {
// In-progress shard split is not allowed in a mixed cluster where node(s) with an unsupported split
// version is present. Hence, we also don't want to allow a node with an unsupported version
// to get this state while shard split is in-progress.
throw new IllegalStateException("In-place split not allowed on older versions.");
}
}
}

@Override
Expand All @@ -634,6 +679,7 @@ public String toString() {
components.add("failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
}
components.add("markAsStale [" + markAsStale + "]");
components.add("childShardsFailedEvent [" + Boolean.TRUE.equals(childShardsFailedEvent) + "]");
return String.join(", ", components);
}

Expand All @@ -646,12 +692,13 @@ public boolean equals(Object o) {
return Objects.equals(this.shardId, that.shardId)
&& Objects.equals(this.allocationId, that.allocationId)
&& primaryTerm == that.primaryTerm
&& markAsStale == that.markAsStale;
&& markAsStale == that.markAsStale
&& childShardsFailedEvent == that.childShardsFailedEvent;
}

@Override
public int hashCode() {
return Objects.hash(shardId, allocationId, primaryTerm, markAsStale);
return Objects.hash(shardId, allocationId, primaryTerm, markAsStale, childShardsFailedEvent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,11 @@ public Builder putParentToChildShardMetadata(SplitMetadata splitMetadata) {
return this;
}

public Builder removeParentToChildShardMetadata(Integer parentShardId) {
parentToChildShardsMetadata.remove(parentShardId);
return this;
}

public long version() {
return this.version;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public interface RoutingChangesObserver {
*/
void splitCompleted(ShardRouting removedSplitSource, IndexMetadata indexMetadata);

/**
* Called when in-place split of child shards has failed.
*/
void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata);

/**
* Called when started replica is promoted to primary.
*/
Expand Down Expand Up @@ -152,6 +157,11 @@ public void splitCompleted(ShardRouting removedSplitSource, IndexMetadata indexM

}

@Override
public void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata) {

}

@Override
public void replicaPromoted(ShardRouting replicaShard) {

Expand Down Expand Up @@ -239,6 +249,13 @@ public void splitCompleted(ShardRouting removedSplitSource, IndexMetadata indexM
}
}

@Override
public void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.splitFailed(splitSource, indexMetadata);
}
}

@Override
public void replicaPromoted(ShardRouting replicaShard) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,8 +798,8 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
assert childShard.isSplitTargetOf(failedShard);
logger.trace("{} is removed due to the failure/cancellation of the source shard", childShard);
remove(childShard);
routingChangesObserver.shardFailed(childShard, unassignedInfo);
}
routingChangesObserver.splitFailed(failedShard, indexMetadata);
}

// fail actual shard
Expand All @@ -818,22 +818,21 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
failedShard.getSplittingShardId(),
failedShard.allocationId().getParentAllocationId()
);
assert sourceShard.isSplitSourceOf(failedShard);
logger.trace(
"{}, resolved source to [{}]. canceling split ... ({})",
failedShard.shardId(),
sourceShard,
unassignedInfo.shortSummary()
);
cancelSplit(sourceShard);
// If recovery of any child shard fails then we fail all child shards. It is right to fail all
// child shards here because this ensures entire recovery cleanup i.e. relocation cancellation
// of source shard and failing all child shards. Also, it aligns with primary failure approach where
// failing primary also ensures failing all replicas and AllocationService anyway ensures that we
// don't get missing failed shard routings here.
for (ShardRouting childShard : sourceShard.getRecoveringChildShards()) {
remove(childShard);
routingChangesObserver.shardFailed(childShard, unassignedInfo);
// If source shard is not splitting then we must have failed it in previous iteration of child shard.
if (sourceShard.splitting() == true) {
assert sourceShard.isSplitSourceOf(failedShard);
logger.trace(
"{}, resolved source to [{}]. canceling split ... ({})",
failedShard.shardId(),
sourceShard,
unassignedInfo.shortSummary()
);
cancelSplit(sourceShard);

for (ShardRouting childShard : sourceShard.getRecoveringChildShards()) {
remove(childShard);
}
routingChangesObserver.splitFailed(sourceShard, indexMetadata);
}
} else {
// The shard is a target or child of a relocating shard. In that case we only need to remove the target/child shard(s) and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -107,7 +106,7 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha

@Override
public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
if (failedShard.active() && failedShard.primary()) {
if (failedShard.active() && failedShard.primary() && failedShard.isSplitTarget() == false) {
Updates updates = changes(failedShard.shardId());
if (updates.firstFailedPrimary == null) {
// more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
Expand All @@ -133,6 +132,12 @@ public void splitCompleted(ShardRouting removedSplitSource, IndexMetadata indexM
}
}

@Override
public void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata) {
Updates updates = changes(splitSource.shardId());
updates.splitFailed = true;
}

/**
* Updates the current {@link Metadata} based on the changes of this RoutingChangesObserver. Specifically
* we update {@link IndexMetadata#getInSyncAllocationIds()} and {@link IndexMetadata#primaryTerm(int)} based on
Expand Down Expand Up @@ -162,7 +167,10 @@ public Metadata applyChanges(Metadata oldMetadata, RoutingTable newRoutingTable)
}
// Invoke metadata update of in-place split only for the parent shard.
if (updates.addedChildShards.isEmpty() == false) {
indexMetadataBuilder = updateMetadataInPlaceSplit(oldIndexMetadata, indexMetadataBuilder, shardId, updates);
indexMetadataBuilder = updateMetadataForInPlaceSplitCompleted(oldIndexMetadata, indexMetadataBuilder, shardId, updates);
}
if (updates.splitFailed) {
updateMetadataForInPlaceSplitFailed(oldIndexMetadata, indexMetadataBuilder, shardId);
}
}

Expand Down Expand Up @@ -368,7 +376,7 @@ private IndexMetadata.Builder updatePrimaryTerm(
/**
* Adds primary terms of child shards and updates number of shards.
*/
private IndexMetadata.Builder updateMetadataInPlaceSplit(
private IndexMetadata.Builder updateMetadataForInPlaceSplitCompleted(
IndexMetadata oldIndexMetadata,
IndexMetadata.Builder indexMetadataBuilder,
ShardId parentShardId,
Expand All @@ -391,6 +399,18 @@ private IndexMetadata.Builder updateMetadataInPlaceSplit(
return indexMetadataBuilder;
}

/**
* Removes parent shard split metadata from index metadata parent shards.
*/
private IndexMetadata.Builder updateMetadataForInPlaceSplitFailed(IndexMetadata oldIndexMetadata,
IndexMetadata.Builder indexMetadataBuilder,
ShardId parentShardId) {
if (indexMetadataBuilder == null) {
indexMetadataBuilder = IndexMetadata.builder(oldIndexMetadata);
}
return indexMetadataBuilder.removeParentToChildShardMetadata(parentShardId.id());
}

/**
* Helper method that creates update entry for the given shard id if such an entry does not exist yet.
*/
Expand Down Expand Up @@ -420,6 +440,7 @@ private static class Updates {
// and update number of current shards.
private Map<ShardId, ShardRouting> addedChildShards = new HashMap<>();
private boolean isNewChildShard;
private boolean splitFailed;
private Set<String> addedAllocationIds = new HashSet<>(); // allocation ids that should be added to the in-sync set
private Set<String> removedAllocationIds = new HashSet<>(); // allocation ids that should be removed from the in-sync set
private ShardRouting initializedPrimary = null; // primary that was initialized from unassigned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ public void splitCompleted(ShardRouting removedSplitSource, IndexMetadata indexM
setChanged();
}

@Override
public void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata) {
assert splitSource.splitting() : "expected splitting shard " + splitSource;
setChanged();
}

@Override
public void replicaPromoted(ShardRouting replicaShard) {
assert replicaShard.started() && replicaShard.primary() == false : "expected started replica shard " + replicaShard;
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,8 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store
if (indexShard != null) {
try {
// only flush if we are closed (closed index or shutdown) and if we are not deleted
final boolean flushEngine = deleted.get() == false && closed.get();
final boolean flushEngine = deleted.get() == false && closed.get() &&
indexShard.routingEntry().isSplitTarget() == false;
indexShard.close(reason, flushEngine, deleted.get());
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("[{}] failed to close index shard", shardId), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,6 @@ public void createChildShardsForSplit(
assert indexService != null;
IndexShard parentShard = indexService.getShard(parentShardId.id());


List<InPlaceShardRecoveryContext> recoveryContexts = new ArrayList<>();
List<ShardId> shardIds = new ArrayList<>();
for (ShardRouting shardRouting : shardRoutings) {
Expand Down
Loading

0 comments on commit 0c51aed

Please sign in to comment.