Skip to content

Commit

Permalink
Made changes in the bytes to download calculation based on comments
Browse files Browse the repository at this point in the history
Signed-off-by: Vinay Krishna Pudyodu <[email protected]>
  • Loading branch information
vinaykpud committed Dec 6, 2024
1 parent d3a8a89 commit dc8db3c
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 216 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add vertical scaling and SoftReference for snapshot repository data cache ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489))
- Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271))
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Added implementation for the stats calculation for search and regular replica in shards ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))
- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,7 @@ protected Settings featureFlagSettings() {

public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNodes(3);

int numShards = 2;
assertAcked(
Expand Down Expand Up @@ -472,25 +470,22 @@ public void testSegmentReplicationStatsResponseWithSearchReplica() throws Except
.setDetailed(true)
.execute()
.actionGet();
assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size());
assertEquals(numShards * 3, segmentReplicationStatsResponse.getTotalShards());
assertEquals(numShards * 3, segmentReplicationStatsResponse.getSuccessfulShards());

SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats()
.stream()
.findFirst()
.get()
.getCurrentReplicationState();
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
assertEquals(segmentReplicationStatsResponse.getTotalShards(), numShards * 3);
assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), numShards * 3);
assertNotNull(currentReplicationState);
assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE);
assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0);
Set<SegmentReplicationShardStats> replicaStats = perGroupStats.getReplicaStats();
for (SegmentReplicationShardStats replica : replicaStats) {
assertNotNull(replica.getCurrentReplicationState());
}
assertEquals(3, replicaStats.size());
}, 1, TimeUnit.MINUTES);
}

public void testSegmentReplicationStatsResponseWithOnlySearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNodes(2);

int numShards = 1;
assertAcked(
Expand Down Expand Up @@ -519,18 +514,16 @@ public void testSegmentReplicationStatsResponseWithOnlySearchReplica() throws Ex
.setDetailed(true)
.execute()
.actionGet();
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats()
.stream()
.findFirst()
.get()
.getCurrentReplicationState();
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
assertEquals(segmentReplicationStatsResponse.getTotalShards(), 2);
assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), 2);
assertNotNull(currentReplicationState);
assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE);
assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0);

SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
Set<SegmentReplicationShardStats> replicaStats = perGroupStats.getReplicaStats();
for (SegmentReplicationShardStats replica : replicaStats) {
assertNotNull(replica.getCurrentReplicationState());
}
assertEquals(1, replicaStats.size());
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,30 @@ public class SegmentReplicationShardStatsResponse implements Writeable {
private final SegmentReplicationState replicaStats;

@Nullable
private final SegmentReplicationShardStats segmentReplicationShardStats;
private final SegmentReplicationShardStats searchReplicaReplicationStats;

public SegmentReplicationShardStatsResponse(StreamInput in) throws IOException {
this.primaryStats = in.readOptionalWriteable(SegmentReplicationPerGroupStats::new);
this.replicaStats = in.readOptionalWriteable(SegmentReplicationState::new);
this.segmentReplicationShardStats = in.readOptionalWriteable(SegmentReplicationShardStats::new);
this.searchReplicaReplicationStats = in.readOptionalWriteable(SegmentReplicationShardStats::new);
}

public SegmentReplicationShardStatsResponse(SegmentReplicationPerGroupStats primaryStats) {
this.primaryStats = primaryStats;
this.replicaStats = null;
this.segmentReplicationShardStats = null;
this.searchReplicaReplicationStats = null;
}

public SegmentReplicationShardStatsResponse(SegmentReplicationState replicaStats) {
this.replicaStats = replicaStats;
this.primaryStats = null;
this.segmentReplicationShardStats = null;
this.searchReplicaReplicationStats = null;
}

public SegmentReplicationShardStatsResponse(SegmentReplicationShardStats segmentReplicationShardStats) {
this.primaryStats = null;
this.replicaStats = null;
this.segmentReplicationShardStats = segmentReplicationShardStats;
this.searchReplicaReplicationStats = segmentReplicationShardStats;
}

public SegmentReplicationPerGroupStats getPrimaryStats() {
Expand All @@ -67,19 +67,23 @@ public SegmentReplicationState getReplicaStats() {
return replicaStats;
}

public SegmentReplicationShardStats getSegmentReplicationShardStats() {
return segmentReplicationShardStats;
public SegmentReplicationShardStats getSearchReplicaReplicationStats() {
return searchReplicaReplicationStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(primaryStats);
out.writeOptionalWriteable(replicaStats);
out.writeOptionalWriteable(segmentReplicationShardStats);
out.writeOptionalWriteable(searchReplicaReplicationStats);
}

@Override
public String toString() {
return "SegmentReplicationShardStatsResponse{" + "primaryStats=" + primaryStats + ", replicaStats=" + replicaStats + '}';
return "SegmentReplicationShardStatsResponse{" +
"primaryStats=" + primaryStats +
", replicaStats=" + replicaStats +
", searchReplicaReplicationStats=" + searchReplicaReplicationStats +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
Expand All @@ -36,7 +34,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -117,8 +114,8 @@ protected SegmentReplicationStatsResponse newResponse(
}
}

if (response.getSegmentReplicationShardStats() != null) {
searchReplicaSegRepShardStats.add(response.getSegmentReplicationShardStats());
if (response.getSearchReplicaReplicationStats() != null) {
searchReplicaSegRepShardStats.add(response.getSearchReplicaReplicationStats());
}

if (response.getPrimaryStats() != null) {
Expand Down Expand Up @@ -175,7 +172,7 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication
if (shardRouting.primary()) {
return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard));
} else if (shardRouting.isSearchOnly()) {
SegmentReplicationShardStats segmentReplicationShardStats = calcualteSegmentReplicationShardStats(shardRouting);
SegmentReplicationShardStats segmentReplicationShardStats = calculateSegmentReplicationShardStats(shardRouting);
return new SegmentReplicationShardStatsResponse(segmentReplicationShardStats);
} else {
return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly()));
Expand All @@ -201,15 +198,15 @@ protected ClusterBlockException checkRequestBlock(
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

private SegmentReplicationShardStats calcualteSegmentReplicationShardStats(ShardRouting shardRouting) {
private SegmentReplicationShardStats calculateSegmentReplicationShardStats(ShardRouting shardRouting) {
ShardId shardId = shardRouting.shardId();
SegmentReplicationState completedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState(shardId);
SegmentReplicationState ongoingSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId);

SegmentReplicationShardStats segmentReplicationShardStats = new SegmentReplicationShardStats(
shardRouting.allocationId().getId(),
calculateCheckpointsBehind(completedSegmentReplicationState, ongoingSegmentReplicationState),
calculateBytesBehind(completedSegmentReplicationState, ongoingSegmentReplicationState),
0,
calculateBytesRemainingToReplicate(ongoingSegmentReplicationState),
0,
getCurrentReplicationLag(ongoingSegmentReplicationState),
getLastCompletedReplicationLag(completedSegmentReplicationState)
Expand All @@ -227,44 +224,16 @@ private SegmentReplicationState getSegmentReplicationState(ShardId shardId, bool
}
}

private long calculateCheckpointsBehind(
SegmentReplicationState completedSegmentReplicationState,
SegmentReplicationState ongoingSegmentReplicationState
) {
if (ongoingSegmentReplicationState == null || ongoingSegmentReplicationState.getReplicationCheckpoint() == null) {
return 0;
}

if (completedSegmentReplicationState == null || completedSegmentReplicationState.getReplicationCheckpoint() == null) {
return ongoingSegmentReplicationState.getReplicationCheckpoint().getSegmentInfosVersion();
}

return ongoingSegmentReplicationState.getReplicationCheckpoint().getSegmentInfosVersion() - completedSegmentReplicationState
.getReplicationCheckpoint()
.getSegmentInfosVersion();
}

private long calculateBytesBehind(
SegmentReplicationState completedSegmentReplicationState,
private long calculateBytesRemainingToReplicate(
SegmentReplicationState ongoingSegmentReplicationState
) {
if (ongoingSegmentReplicationState == null || ongoingSegmentReplicationState.getReplicationCheckpoint() == null) {
if (ongoingSegmentReplicationState == null) {
return 0;
}

if (completedSegmentReplicationState == null || completedSegmentReplicationState.getReplicationCheckpoint() == null) {
Store.RecoveryDiff diff = Store.segmentReplicationDiff(
ongoingSegmentReplicationState.getReplicationCheckpoint().getMetadataMap(),
Collections.emptyMap()
);
return diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();
}

Store.RecoveryDiff diff = Store.segmentReplicationDiff(
ongoingSegmentReplicationState.getReplicationCheckpoint().getMetadataMap(),
completedSegmentReplicationState.getReplicationCheckpoint().getMetadataMap()
);
return diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();
return ongoingSegmentReplicationState.getIndex()
.fileDetails().stream()
.mapToLong(index -> index.length() - index.recovered())
.sum();
}

private long getCurrentReplicationLag(SegmentReplicationState ongoingSegmentReplicationState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("current_replication_lag", new TimeValue(currentReplicationLagMillis));
builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis));
if (currentReplicationState != null) {
builder.startObject("current_replication_state");
builder.startObject();
currentReplicationState.toXContent(builder, params);
builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ && shouldSkipReplicationTimer(entry.getKey()) == false
&& isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(entry.getKey()).currentNodeId())))
)
.map(entry -> buildShardStats(entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableSet());
.collect(Collectors.toSet());
}
return Collections.emptySet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationTimer;
Expand Down Expand Up @@ -89,8 +88,6 @@ public static Stage fromId(byte id) {
private String sourceDescription;
private DiscoveryNode targetNode;

private ReplicationCheckpoint replicationCheckpoint;

public ShardRouting getShardRouting() {
return shardRouting;
}
Expand Down Expand Up @@ -151,10 +148,6 @@ public TimeValue getFinalizeReplicationStageTime() {
return new TimeValue(time);
}

public ReplicationCheckpoint getReplicationCheckpoint() {
return this.replicationCheckpoint;
}

public SegmentReplicationState(
ShardRouting shardRouting,
ReplicationLuceneIndex index,
Expand Down Expand Up @@ -259,10 +252,6 @@ public void setStage(Stage stage) {
}
}

public void setReplicationCheckpoint(ReplicationCheckpoint replicationCheckpoint) {
this.replicationCheckpoint = replicationCheckpoint;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ public void startReplication(ActionListener<Void> listener) {
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

checkpointInfoListener.whenComplete(checkpointInfo -> {
state.setReplicationCheckpoint(checkpointInfo.getCheckpoint());
final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo);
state.setStage(SegmentReplicationState.Stage.GET_FILES);
cancellableThreads.checkForCancel();
Expand Down
Loading

0 comments on commit dc8db3c

Please sign in to comment.