Skip to content

Commit

Permalink
Added unit tests for TransportSegmentReplicationStatsAction
Browse files Browse the repository at this point in the history
Signed-off-by: Vinay Krishna Pudyodu <[email protected]>
  • Loading branch information
vinaykpud committed Nov 22, 2024
1 parent 39576c3 commit 803c15b
Show file tree
Hide file tree
Showing 4 changed files with 556 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.core.action.support.DefaultShardOperationFailedException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand All @@ -31,13 +30,13 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -166,9 +165,10 @@ protected SegmentReplicationStatsRequest readRequestFrom(StreamInput in) throws

@Override
protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
ShardId shardId = shardRouting.shardId();
IndexShard indexShard = indicesService
.indexServiceSafe(shardId.getIndex())
.getShard(shardId.id());

if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() == false) {
return null;
Expand All @@ -177,12 +177,7 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication
if (shardRouting.primary()) {
return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard));
} else if (shardRouting.isSearchOnly()) {
SegmentReplicationShardStats segmentReplicationShardStats = calcualteSegmentReplicationShardStats(
shardRouting,
indexShard,
shardId,
request.activeOnly()
);
SegmentReplicationShardStats segmentReplicationShardStats = calcualteSegmentReplicationShardStats(shardRouting);
return new SegmentReplicationShardStatsResponse(segmentReplicationShardStats);
} else {
return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly()));
Expand All @@ -208,31 +203,22 @@ protected ClusterBlockException checkRequestBlock(
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

private SegmentReplicationShardStats calcualteSegmentReplicationShardStats(
ShardRouting shardRouting,
IndexShard indexShard,
ShardId shardId,
boolean isActiveOnly
) {
ReplicationCheckpoint indexReplicationCheckpoint = indexShard.getLatestReplicationCheckpoint();
SegmentReplicationState segmentReplicationState = getSegmentReplicationState(shardId, isActiveOnly);
if (segmentReplicationState != null) {
ReplicationCheckpoint latestReplicationCheckpointReceived = segmentReplicationState.getLatestReplicationCheckpoint();

SegmentReplicationShardStats segmentReplicationShardStats = new SegmentReplicationShardStats(
shardRouting.allocationId().getId(),
calculateCheckpointsBehind(indexReplicationCheckpoint, latestReplicationCheckpointReceived),
calculateBytesBehind(indexReplicationCheckpoint, latestReplicationCheckpointReceived),
0,
calculateCurrentReplicationLag(shardId),
getLastCompletedReplicationLag(shardId)
);
private SegmentReplicationShardStats calcualteSegmentReplicationShardStats(ShardRouting shardRouting) {
ShardId shardId = shardRouting.shardId();
SegmentReplicationState completedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState(shardId);
SegmentReplicationState ongoingSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId);

segmentReplicationShardStats.setCurrentReplicationState(segmentReplicationState);
return segmentReplicationShardStats;
} else {
return new SegmentReplicationShardStats(shardRouting.allocationId().getId(), 0, 0, 0, 0, 0);
}
SegmentReplicationShardStats segmentReplicationShardStats = new SegmentReplicationShardStats(
shardRouting.allocationId().getId(),
calculateCheckpointsBehind(completedSegmentReplicationState, ongoingSegmentReplicationState),
calculateBytesBehind(completedSegmentReplicationState, ongoingSegmentReplicationState),
0,
getCurrentReplicationLag(ongoingSegmentReplicationState),
getLastCompletedReplicationLag(completedSegmentReplicationState)
);

segmentReplicationShardStats.setCurrentReplicationState(targetService.getSegmentReplicationState(shardId));
return segmentReplicationShardStats;
}

private SegmentReplicationState getSegmentReplicationState(ShardId shardId, boolean isActiveOnly) {
Expand All @@ -244,38 +230,54 @@ private SegmentReplicationState getSegmentReplicationState(ShardId shardId, bool
}

private long calculateCheckpointsBehind(
ReplicationCheckpoint indexReplicationCheckpoint,
ReplicationCheckpoint latestReplicationCheckpointReceived
SegmentReplicationState completedSegmentReplicationState,
SegmentReplicationState ongoingSegmentReplicationState
) {
if (latestReplicationCheckpointReceived != null) {
return latestReplicationCheckpointReceived.getSegmentInfosVersion() - indexReplicationCheckpoint.getSegmentInfosVersion();
if (ongoingSegmentReplicationState == null || ongoingSegmentReplicationState.getReplicationCheckpoint() == null) {
return 0;
}
return 0;

if(completedSegmentReplicationState == null ||
completedSegmentReplicationState.getReplicationCheckpoint() == null) {
return ongoingSegmentReplicationState
.getReplicationCheckpoint()
.getSegmentInfosVersion();
}

return ongoingSegmentReplicationState.getReplicationCheckpoint().getSegmentInfosVersion() -
completedSegmentReplicationState.getReplicationCheckpoint().getSegmentInfosVersion();
}

private long calculateBytesBehind(
ReplicationCheckpoint indexReplicationCheckpoint,
ReplicationCheckpoint latestReplicationCheckpointReceived
SegmentReplicationState completedSegmentReplicationState,
SegmentReplicationState ongoingSegmentReplicationState
) {
if (latestReplicationCheckpointReceived != null) {
if (ongoingSegmentReplicationState == null ||
ongoingSegmentReplicationState.getReplicationCheckpoint() == null) {
return 0;
}

if (completedSegmentReplicationState == null ||
completedSegmentReplicationState.getReplicationCheckpoint() == null) {
Store.RecoveryDiff diff = Store.segmentReplicationDiff(
latestReplicationCheckpointReceived.getMetadataMap(),
indexReplicationCheckpoint.getMetadataMap()
ongoingSegmentReplicationState.getReplicationCheckpoint().getMetadataMap(),
Collections.emptyMap()
);
return diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();
}
return 0;

Store.RecoveryDiff diff = Store.segmentReplicationDiff(
ongoingSegmentReplicationState.getReplicationCheckpoint().getMetadataMap(),
completedSegmentReplicationState.getReplicationCheckpoint().getMetadataMap()
);
return diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();
}

private long calculateCurrentReplicationLag(ShardId shardId) {
SegmentReplicationState ongoingEventSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId);
return ongoingEventSegmentReplicationState != null ? ongoingEventSegmentReplicationState.getTimer().time() : 0;
private long getCurrentReplicationLag(SegmentReplicationState ongoingSegmentReplicationState) {
return ongoingSegmentReplicationState != null ? ongoingSegmentReplicationState.getTimer().time() : 0;
}

private long getLastCompletedReplicationLag(ShardId shardId) {
SegmentReplicationState lastCompletedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState(
shardId
);
return lastCompletedSegmentReplicationState != null ? lastCompletedSegmentReplicationState.getTimer().time() : 0;
private long getLastCompletedReplicationLag(SegmentReplicationState completedSegmentReplicationState) {
return completedSegmentReplicationState != null ? completedSegmentReplicationState.getTimer().time() : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static Stage fromId(byte id) {
private String sourceDescription;
private DiscoveryNode targetNode;

private ReplicationCheckpoint latestReplicationCheckpoint;
private ReplicationCheckpoint replicationCheckpoint;

public ShardRouting getShardRouting() {
return shardRouting;
Expand Down Expand Up @@ -151,8 +151,8 @@ public TimeValue getFinalizeReplicationStageTime() {
return new TimeValue(time);
}

public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return this.latestReplicationCheckpoint;
public ReplicationCheckpoint getReplicationCheckpoint() {
return this.replicationCheckpoint;
}

public SegmentReplicationState(
Expand Down Expand Up @@ -259,8 +259,8 @@ public void setStage(Stage stage) {
}
}

public void setLatestReplicationCheckpoint(ReplicationCheckpoint latestReplicationCheckpoint) {
this.latestReplicationCheckpoint = latestReplicationCheckpoint;
public void setReplicationCheckpoint(ReplicationCheckpoint replicationCheckpoint) {
this.replicationCheckpoint = replicationCheckpoint;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void startReplication(ActionListener<Void> listener) {
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

checkpointInfoListener.whenComplete(checkpointInfo -> {
state.setLatestReplicationCheckpoint(checkpointInfo.getCheckpoint());
state.setReplicationCheckpoint(checkpointInfo.getCheckpoint());
final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo);
state.setStage(SegmentReplicationState.Stage.GET_FILES);
cancellableThreads.checkForCancel();
Expand Down
Loading

0 comments on commit 803c15b

Please sign in to comment.