From 44a1134c50b2528e45821ed738a62d9ccd9c7d7d Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Sat, 18 Jan 2025 18:52:41 -0800 Subject: [PATCH 1/9] Implemented computation of segment replication stats at shard level The method implemented here computes the segment replication stats at the shard level, instead of relying on the primary shard to compute stats based on reports from its replicas. Signed-off-by: Vinay Krishna Pudyodu --- .../opensearch/index/shard/IndexShardIT.java | 5 +- .../org/opensearch/index/IndexModule.java | 11 ++- .../org/opensearch/index/IndexService.java | 12 ++- .../opensearch/index/shard/IndexShard.java | 23 ++--- .../metadata/RemoteSegmentMetadata.java | 4 +- .../opensearch/indices/IndicesService.java | 11 ++- .../replication/SegmentReplicationTarget.java | 10 +- .../SegmentReplicationTargetService.java | 1 + .../replication/SegmentReplicator.java | 68 +++++++++++++- .../checkpoint/ReplicationCheckpoint.java | 16 +++- .../main/java/org/opensearch/node/Node.java | 3 +- .../opensearch/index/IndexModuleTests.java | 3 +- .../index/seqno/ReplicationTrackerTests.java | 15 ++- .../SegmentReplicationTargetTests.java | 15 +-- .../replication/SegmentReplicatorTests.java | 93 +++++++++++++++++++ .../replication/common/CopyStateTests.java | 3 +- .../index/shard/IndexShardTestCase.java | 17 ++-- 17 files changed, 262 insertions(+), 48 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index f97950f2652a3..41d00b94877af 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -114,6 +114,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.Stream; @@ -136,6 +137,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength; +import static org.mockito.Mockito.mock; public class IndexShardIT extends OpenSearchSingleNodeTestCase { @@ -716,7 +718,8 @@ public static final IndexShard newIndexShard( null, DefaultRemoteStoreSettings.INSTANCE, false, - IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting) + IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting), + mock(BiFunction.class) ); } diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 79de97dc96fba..c55e5e1c90778 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -59,6 +59,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; @@ -87,6 +88,7 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; @@ -653,7 +655,8 @@ public IndexService newIndexService( clusterDefaultRefreshIntervalSupplier, recoverySettings, remoteStoreSettings, - (s) -> {} + (s) -> {}, + null ); } @@ -679,7 +682,8 @@ public IndexService newIndexService( Supplier clusterDefaultRefreshIntervalSupplier, RecoverySettings recoverySettings, RemoteStoreSettings remoteStoreSettings, - Consumer replicator + Consumer replicator, + BiFunction segmentReplicationStatsProvider ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -741,7 +745,8 @@ public IndexService newIndexService( remoteStoreSettings, fileCache, compositeIndexSettings, - replicator + replicator, + segmentReplicationStatsProvider ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index f1b36194bf62d..69a950e604ecb 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -105,6 +105,7 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.plugins.IndexStorePlugin; @@ -197,6 +198,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final FileCache fileCache; private final CompositeIndexSettings compositeIndexSettings; private final Consumer replicator; + private final BiFunction segmentReplicationStatsProvider; public IndexService( IndexSettings indexSettings, @@ -235,7 +237,8 @@ public IndexService( RemoteStoreSettings remoteStoreSettings, FileCache fileCache, CompositeIndexSettings compositeIndexSettings, - Consumer replicator + Consumer replicator, + BiFunction segmentReplicationStatsProvider ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -319,6 +322,7 @@ public IndexService( this.compositeIndexSettings = compositeIndexSettings; this.fileCache = fileCache; this.replicator = replicator; + this.segmentReplicationStatsProvider = segmentReplicationStatsProvider; updateFsyncTaskIfNecessary(); } @@ -395,7 +399,8 @@ public IndexService( remoteStoreSettings, null, null, - s -> {} + s -> {}, + null ); } @@ -691,7 +696,8 @@ protected void closeInternal() { recoverySettings, remoteStoreSettings, seedRemote, - discoveryNodes + discoveryNodes, + segmentReplicationStatsProvider ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index eb3999718ca5b..50f43ff7ec94d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -361,6 +361,7 @@ Runnable getGlobalCheckpointSyncer() { */ private final ShardMigrationState shardMigrationState; private DiscoveryNodes discoveryNodes; + private final BiFunction segmentReplicationStatsProvider; public IndexShard( final ShardRouting shardRouting, @@ -391,7 +392,8 @@ public IndexShard( final RecoverySettings recoverySettings, final RemoteStoreSettings remoteStoreSettings, boolean seedRemote, - final DiscoveryNodes discoveryNodes + final DiscoveryNodes discoveryNodes, + final BiFunction segmentReplicationStatsProvider ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -493,6 +495,7 @@ public boolean shouldCache(Query query) { this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote); this.discoveryNodes = discoveryNodes; + this.segmentReplicationStatsProvider = segmentReplicationStatsProvider; } public ThreadPool getThreadPool() { @@ -1768,7 +1771,8 @@ ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) th segmentInfos.getVersion(), metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(), getEngine().config().getCodec().getName(), - metadataMap + metadataMap, + System.currentTimeMillis() ); logger.trace("Recomputed ReplicationCheckpoint for shard {}", checkpoint); return checkpoint; @@ -3209,17 +3213,10 @@ public Set getReplicationStatsForTrackedReplicas() } public ReplicationStats getReplicationStats() { - if (indexSettings.isSegRepEnabledOrRemoteNode() && routingEntry().primary()) { - final Set stats = getReplicationStatsForTrackedReplicas(); - long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L); - long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum(); - long maxReplicationLag = stats.stream() - .mapToLong(SegmentReplicationShardStats::getCurrentReplicationLagMillis) - .max() - .orElse(0L); - return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag); - } - return new ReplicationStats(); + if (indexSettings.isSegRepEnabledOrRemoteNode() && !routingEntry().primary()) { + return segmentReplicationStatsProvider.apply(shardId, this.getLatestReplicationCheckpoint()); + } + return new ReplicationStats(0, 0, 0); } /** diff --git a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java index 41a145273e8ef..c04ede4b443a1 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java +++ b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java @@ -136,6 +136,7 @@ public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicatio out.writeLong(replicationCheckpoint.getSegmentInfosVersion()); out.writeLong(replicationCheckpoint.getLength()); out.writeString(replicationCheckpoint.getCodec()); + out.writeLong(replicationCheckpoint.getCreatedTimeStamp()); } private static ReplicationCheckpoint readCheckpointFromIndexInput( @@ -149,7 +150,8 @@ private static ReplicationCheckpoint readCheckpointFromIndexInput( in.readLong(), in.readLong(), in.readString(), - toStoreFileMetadata(uploadedSegmentMetadataMap) + toStoreFileMetadata(uploadedSegmentMetadataMap), + in.readLong() ); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b9bad5527e3f4..cbfcfb45d4aa2 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -105,6 +105,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.compositeindex.CompositeIndexSettings; @@ -150,6 +151,7 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.Node; @@ -361,6 +363,7 @@ public class IndicesService extends AbstractLifecycleComponent private final FileCache fileCache; private final CompositeIndexSettings compositeIndexSettings; private final Consumer replicator; + private final BiFunction segmentReplicationStatsProvider; private volatile int maxSizeInRequestCache; @Override @@ -399,7 +402,8 @@ public IndicesService( RemoteStoreSettings remoteStoreSettings, FileCache fileCache, CompositeIndexSettings compositeIndexSettings, - Consumer replicator + Consumer replicator, + BiFunction segmentReplicationStatsProvider ) { this.settings = settings; this.threadPool = threadPool; @@ -509,6 +513,7 @@ protected void closeInternal() { this.compositeIndexSettings = compositeIndexSettings; this.fileCache = fileCache; this.replicator = replicator; + this.segmentReplicationStatsProvider = segmentReplicationStatsProvider; this.maxSizeInRequestCache = INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() .addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING, this::setMaxSizeInRequestCache); @@ -573,6 +578,7 @@ public IndicesService( remoteStoreSettings, null, null, + null, null ); } @@ -990,7 +996,8 @@ private synchronized IndexService createIndexService( this::getClusterDefaultRefreshInterval, this.recoverySettings, this.remoteStoreSettings, - replicator + replicator, + segmentReplicationStatsProvider ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7131b49a41834..8dfc4f45bc3c9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -24,6 +24,7 @@ import org.opensearch.common.util.CancellableThreads; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; @@ -39,6 +40,7 @@ import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.function.BiConsumer; import java.util.stream.Collectors; /** @@ -139,6 +141,10 @@ public ReplicationCheckpoint getCheckpoint() { return this.checkpoint; } + public SegmentReplicationSource getSource() { + return source; + } + @Override public void writeFileChunk( StoreFileMetadata metadata, @@ -161,7 +167,7 @@ public void writeFileChunk( * * @param listener {@link ActionListener} listener. */ - public void startReplication(ActionListener listener) { + public void startReplication(ActionListener listener, BiConsumer checkpointUpdater) { cancellableThreads.setOnCancel((reason, beforeCancelEx) -> { throw new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]"); }); @@ -177,6 +183,8 @@ public void startReplication(ActionListener listener) { source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); checkpointInfoListener.whenComplete(checkpointInfo -> { + checkpointUpdater.accept(checkpointInfo.getCheckpoint(), this.indexShard.shardId()); + final List filesToFetch = getFiles(checkpointInfo); state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 8fee3f671ecc9..bd11f1e3d02ec 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -294,6 +294,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe return; } updateLatestReceivedCheckpoint(receivedCheckpoint, replicaShard); + replicator.updatePrimaryLastRefreshedCheckpoint(receivedCheckpoint, replicaShard.shardId()); // Checks if replica shard is in the correct STARTED state to process checkpoints (avoids parallel replication events taking place) // This check ensures we do not try to process a received checkpoint while the shard is still recovering, yet we stored the latest // checkpoint to be replayed once the shard is Active. diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java index ad3bc1933208c..c540fa37607eb 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java @@ -13,14 +13,17 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.opensearch.OpenSearchCorruptionException; +import org.opensearch.action.StepListener; import org.opensearch.common.SetOnce; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationFailedException; @@ -29,6 +32,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Objects; /** * This class is responsible for managing segment replication events on replicas. @@ -43,8 +47,9 @@ public class SegmentReplicator { private final ReplicationCollection onGoingReplications; private final Map completedReplications = ConcurrentCollections.newConcurrentMap(); + private final Map primaryLastRefreshedCheckpoint = ConcurrentCollections.newConcurrentMap(); + private final Map lastOnGoingReplicationCheckpoint = ConcurrentCollections.newConcurrentMap(); private final ThreadPool threadPool; - private final SetOnce sourceFactory; public SegmentReplicator(ThreadPool threadPool) { @@ -102,6 +107,50 @@ SegmentReplicationTarget startReplication( return target; } + public ReplicationStats getSegmentReplicationStats(ShardId shardId, ReplicationCheckpoint indexReplicationCheckPoint) { + assert shardId != null : "shardId cannot be null"; + assert indexReplicationCheckPoint != null : "indexReplicationCheckPoint cannot be null"; + ; + final Map indexStoreFileMetadata = indexReplicationCheckPoint.getMetadataMap(); + // If primaryLastRefreshedCheckpoint is null, we will default to indexReplicationCheckPoint + // so that we can avoid any failures + final ReplicationCheckpoint primaryLastRefreshedCheckpoint = Objects.requireNonNullElse( + this.primaryLastRefreshedCheckpoint.get(shardId), + indexReplicationCheckPoint + ); + final Map storeFileMetadata = primaryLastRefreshedCheckpoint.getMetadataMap(); + + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(storeFileMetadata, indexStoreFileMetadata); + long bytesBehindSum = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum(); + + final ReplicationCheckpoint lastOnGoingReplicationCheckpoint = this.lastOnGoingReplicationCheckpoint.get(shardId); + final long replicationLag = lastOnGoingReplicationCheckpoint != null + ? System.currentTimeMillis() - lastOnGoingReplicationCheckpoint.getCreatedTimeStamp() + : 0; + + return new ReplicationStats(bytesBehindSum, bytesBehindSum, bytesBehindSum > 0L ? replicationLag : 0); + } + + public void updatePrimaryLastRefreshedCheckpoint(ReplicationCheckpoint replicationCheckpoint, ShardId shardId) { + updateCheckpointIfAhead(primaryLastRefreshedCheckpoint, replicationCheckpoint, shardId); + } + + public void updateReplicationCheckpoints(ReplicationCheckpoint replicationCheckpoint, ShardId shardId) { + updateCheckpointIfAhead(lastOnGoingReplicationCheckpoint, replicationCheckpoint, shardId); + updatePrimaryLastRefreshedCheckpoint(replicationCheckpoint, shardId); + } + + private void updateCheckpointIfAhead( + Map checkpointMap, + ReplicationCheckpoint newCheckpoint, + ShardId shardId + ) { + final ReplicationCheckpoint existingCheckpoint = checkpointMap.get(shardId); + if (existingCheckpoint == null || newCheckpoint.isAheadOf(existingCheckpoint)) { + checkpointMap.put(shardId, newCheckpoint); + } + } + /** * Runnable implementation to trigger a replication event. */ @@ -153,7 +202,7 @@ public void onFailure(Exception e) { } onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), false); } - }); + }, this::updateReplicationCheckpoints); } // pkg-private for integration tests @@ -163,6 +212,7 @@ void startReplication(final SegmentReplicationTarget target, TimeValue timeout) replicationId = onGoingReplications.startSafe(target, timeout); } catch (ReplicationFailedException e) { // replication already running for shard. + fetchPrimaryLastRefreshedCheckpoint(target); target.fail(e, false); return; } @@ -170,6 +220,20 @@ void startReplication(final SegmentReplicationTarget target, TimeValue timeout) threadPool.generic().execute(new ReplicationRunner(replicationId)); } + private void fetchPrimaryLastRefreshedCheckpoint(SegmentReplicationTarget target) { + // Only process search-only shards + if (!target.indexShard().routingEntry().isSearchOnly()) { + return; + } + + final StepListener checkpointInfoListener = new StepListener<>(); + target.getSource().getCheckpointMetadata(target.getId(), target.getCheckpoint(), checkpointInfoListener); + checkpointInfoListener.whenComplete( + checkpointInfo -> updatePrimaryLastRefreshedCheckpoint(checkpointInfo.getCheckpoint(), target.indexShard().shardId()), + checkpointInfoListener::onFailure + ); + } + private boolean isStoreCorrupt(SegmentReplicationTarget target) { // ensure target is not already closed. In that case // we can assume the store is not corrupt and that the replication diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 29410159a4955..009678b7502f8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -38,6 +38,7 @@ public class ReplicationCheckpoint implements Writeable, Comparable metadataMap; + private final long createdTimeStamp; public static ReplicationCheckpoint empty(ShardId shardId) { return empty(shardId, ""); @@ -55,10 +56,11 @@ private ReplicationCheckpoint(ShardId shardId, String codec) { length = 0L; this.codec = codec; this.metadataMap = Collections.emptyMap(); + this.createdTimeStamp = System.currentTimeMillis(); } public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) { - this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap()); + this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap(), System.currentTimeMillis()); } public ReplicationCheckpoint( @@ -68,7 +70,8 @@ public ReplicationCheckpoint( long segmentInfosVersion, long length, String codec, - Map metadataMap + Map metadataMap, + long createdTimeStamp ) { this.shardId = shardId; this.primaryTerm = primaryTerm; @@ -77,6 +80,7 @@ public ReplicationCheckpoint( this.length = length; this.codec = codec; this.metadataMap = metadataMap; + this.createdTimeStamp = createdTimeStamp; } public ReplicationCheckpoint(StreamInput in) throws IOException { @@ -96,6 +100,7 @@ public ReplicationCheckpoint(StreamInput in) throws IOException { } else { this.metadataMap = Collections.emptyMap(); } + this.createdTimeStamp = in.readLong(); } /** @@ -159,6 +164,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_10_0)) { out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); } + out.writeLong(createdTimeStamp); } @Override @@ -197,6 +203,10 @@ public Map getMetadataMap() { return metadataMap; } + public long getCreatedTimeStamp() { + return createdTimeStamp; + } + @Override public String toString() { return "ReplicationCheckpoint{" @@ -212,6 +222,8 @@ public String toString() { + length + ", codec=" + codec + + ", timestamp=" + + createdTimeStamp + '}'; } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 704a23890b07a..0c52571203594 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -949,7 +949,8 @@ protected Node( remoteStoreSettings, fileCache, compositeIndexSettings, - segmentReplicator::startReplication + segmentReplicator::startReplication, + segmentReplicator::getSegmentReplicationStats ); final IngestService ingestService = new IngestService( diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index bd86d3d396987..90f2b0b21cc8a 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -265,7 +265,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { () -> IndexSettings.DEFAULT_REFRESH_INTERVAL, DefaultRecoverySettings.INSTANCE, DefaultRemoteStoreSettings.INSTANCE, - s -> {} + s -> {}, + null ); } diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java index 233a99cbe4a73..899e80965e4fd 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java @@ -1844,7 +1844,8 @@ public void testSegmentReplicationCheckpointTracking() { 1, 1L, Codec.getDefault().getName(), - Map.of("segment_1", segment_1) + Map.of("segment_1", segment_1), + 0L ); final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( tracker.shardId(), @@ -1853,7 +1854,8 @@ public void testSegmentReplicationCheckpointTracking() { 2, 51L, Codec.getDefault().getName(), - Map.of("segment_1", segment_1, "segment_2", segment_2) + Map.of("segment_1", segment_1, "segment_2", segment_2), + 0L ); final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint( tracker.shardId(), @@ -1862,7 +1864,8 @@ public void testSegmentReplicationCheckpointTracking() { 3, 151L, Codec.getDefault().getName(), - Map.of("segment_1", segment_1, "segment_2", segment_2, "segment_3", segment_3) + Map.of("segment_1", segment_1, "segment_2", segment_2, "segment_3", segment_3), + 0L ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); @@ -1974,7 +1977,8 @@ public void testSegmentReplicationCheckpointForRelocatingPrimary() { 1, 5L, Codec.getDefault().getName(), - Map.of("segment_1", segment_1) + Map.of("segment_1", segment_1), + 0L ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); tracker.startReplicationLagTimers(initialCheckpoint); @@ -2033,7 +2037,8 @@ public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() { 1, 1L, Codec.getDefault().getName(), - Collections.emptyMap() + Collections.emptyMap(), + 0L ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); tracker.startReplicationLagTimers(initialCheckpoint); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 8b4b3aff701b4..fd26bdbf1068f 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -177,6 +177,9 @@ public void onFailure(Exception e) { logger.error("Unexpected onFailure", e); Assert.fail(); } + }, (ReplicationCheckpoint checkpoint, ShardId shardId) -> { + assertEquals(repCheckpoint, checkpoint); + assertEquals(shardId, spyIndexShard.shardId()); }); } @@ -230,7 +233,7 @@ public void onFailure(Exception e) { assertEquals(exception, e.getCause().getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); } - }); + }, mock(BiConsumer.class)); } public void testFailureResponse_getSegmentFiles() { @@ -283,7 +286,7 @@ public void onFailure(Exception e) { assertEquals(exception, e.getCause().getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); } - }); + }, mock(BiConsumer.class)); } public void testFailure_finalizeReplication_NonCorruptionException() throws IOException { @@ -330,7 +333,7 @@ public void onFailure(Exception e) { assertEquals(exception, e.getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); } - }); + }, mock(BiConsumer.class)); } public void testFailure_finalizeReplication_IndexFormatException() throws IOException { @@ -376,7 +379,7 @@ public void onFailure(Exception e) { assertEquals(exception, e.getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); } - }); + }, mock(BiConsumer.class)); } public void testFailure_differentSegmentFiles() throws IOException { @@ -429,7 +432,7 @@ public void onFailure(Exception e) { assertTrue(e.getMessage().contains("has local copies of segments that differ from the primary")); segrepTarget.fail(new ReplicationFailedException(e), false); } - }); + }, mock(BiConsumer.class)); } /** @@ -483,7 +486,7 @@ public void onFailure(Exception e) { logger.error("Unexpected onFailure", e); Assert.fail(); } - }); + }, mock(BiConsumer.class)); } /** diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java index 81ea16c80dd79..eda2ef58f4265 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java @@ -9,6 +9,8 @@ package org.opensearch.indices.replication; import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.Version; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -20,6 +22,8 @@ import org.opensearch.common.lucene.Lucene; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; @@ -35,6 +39,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -200,6 +205,94 @@ public void getSegmentFiles( closeShards(primary, replica); } + public void testGetSegmentReplicationStats_WhenNoReplication() { + SegmentReplicator segmentReplicator = new SegmentReplicator(threadPool); + ShardId shardId = new ShardId("index", "uuid", 0); + ReplicationCheckpoint replicationCheckpoint = ReplicationCheckpoint.empty(shardId); + ReplicationStats replicationStats = segmentReplicator.getSegmentReplicationStats(shardId, replicationCheckpoint); + assertEquals(0, replicationStats.maxReplicationLag); + assertEquals(0, replicationStats.totalBytesBehind); + assertEquals(0, replicationStats.maxBytesBehind); + } + + public void testGetSegmentReplicationStats_WhenOnGoingReplication() { + SegmentReplicator segmentReplicator = new SegmentReplicator(threadPool); + ShardId shardId = new ShardId("index", "uuid", 0); + ReplicationCheckpoint firstReplicationCheckpoint = ReplicationCheckpoint.empty(shardId); + + BytesRef bytesRef = new BytesRef(500); + StoreFileMetadata storeFileMetadata1 = new StoreFileMetadata("test-1", 500, "1", Version.LATEST, bytesRef); + StoreFileMetadata storeFileMetadata2 = new StoreFileMetadata("test-2", 500, "1", Version.LATEST, bytesRef); + Map stringStoreFileMetadataMap = new HashMap<>(); + stringStoreFileMetadataMap.put("test-1", storeFileMetadata1); + stringStoreFileMetadataMap.put("test-2", storeFileMetadata2); + ReplicationCheckpoint secondReplicationCheckpoint = new ReplicationCheckpoint( + shardId, + 2, + 2, + 2, + 1000, + "", + stringStoreFileMetadataMap, + System.currentTimeMillis() + ); + + segmentReplicator.updateReplicationCheckpoints(secondReplicationCheckpoint, shardId); + + ReplicationStats replicationStats = segmentReplicator.getSegmentReplicationStats(shardId, firstReplicationCheckpoint); + assertEquals(1000, replicationStats.totalBytesBehind); + assertEquals(1000, replicationStats.maxBytesBehind); + // Since we use System.currentTimeMillis() directly inside the method, actual value will vary + assertTrue(replicationStats.maxReplicationLag > 0); + } + + public void testGetSegmentReplicationStats_WhileOnGoingReplicationPrimaryRefreshedToNewCheckPoint() { + SegmentReplicator segmentReplicator = new SegmentReplicator(threadPool); + ShardId shardId = new ShardId("index", "uuid", 0); + ReplicationCheckpoint firstReplicationCheckpoint = ReplicationCheckpoint.empty(shardId); + + BytesRef bytesRef = new BytesRef(500); + StoreFileMetadata storeFileMetadata1 = new StoreFileMetadata("test-1", 500, "1", Version.LATEST, bytesRef); + StoreFileMetadata storeFileMetadata2 = new StoreFileMetadata("test-2", 500, "1", Version.LATEST, bytesRef); + Map stringStoreFileMetadataMap = new HashMap<>(); + stringStoreFileMetadataMap.put("test-1", storeFileMetadata1); + stringStoreFileMetadataMap.put("test-2", storeFileMetadata2); + ReplicationCheckpoint secondReplicationCheckpoint = new ReplicationCheckpoint( + shardId, + 2, + 2, + 2, + 1000, + "", + stringStoreFileMetadataMap, + System.currentTimeMillis() + ); + + StoreFileMetadata storeFileMetadata3 = new StoreFileMetadata("test-3", 200, "1", Version.LATEST, bytesRef); + stringStoreFileMetadataMap.put("test-3", storeFileMetadata3); + + ReplicationCheckpoint thirdReplicationCheckpoint = new ReplicationCheckpoint( + shardId, + 3, + 3, + 3, + 1200, + "", + stringStoreFileMetadataMap, + System.currentTimeMillis() + ); + + segmentReplicator.updateReplicationCheckpoints(secondReplicationCheckpoint, shardId); + segmentReplicator.updatePrimaryLastRefreshedCheckpoint(thirdReplicationCheckpoint, shardId); + + ReplicationStats replicationStats = segmentReplicator.getSegmentReplicationStats(shardId, firstReplicationCheckpoint); + System.out.println(replicationStats); + assertEquals(1200, replicationStats.totalBytesBehind); + assertEquals(1200, replicationStats.maxBytesBehind); + // Since we use System.currentTimeMillis() directly inside the method, actual value will vary + assertTrue(replicationStats.maxReplicationLag > 0); + } + protected void resolveCheckpointListener(ActionListener listener, IndexShard primary) { try (final CopyState copyState = new CopyState(primary)) { listener.onResponse( diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index 0b30486038e3a..3b7c5560f89fb 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -81,7 +81,8 @@ public static IndexShard createMockIndexShard() throws IOException { 0L, 0L, Codec.getDefault().getName(), - SI_SNAPSHOT.asMap() + SI_SNAPSHOT.asMap(), + 0L ); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 655a9eb7d5d38..66fafc23b4f60 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -37,6 +37,8 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.junit.Assert; +import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.admin.indices.flush.FlushRequest; @@ -86,6 +88,7 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.MapperTestUtils; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.VersionType; import org.opensearch.index.cache.IndexCache; import org.opensearch.index.cache.query.DisabledQueryCache; @@ -159,7 +162,6 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import org.junit.Assert; import java.io.IOException; import java.nio.file.Path; @@ -182,10 +184,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import org.mockito.Mockito; - -import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; -import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -194,6 +192,8 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; /** * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, @@ -688,6 +688,10 @@ protected IndexShard newShard( } return new InternalTranslogFactory(); }; + //This is fine since we are not testing the node stats now + BiFunction mockReplicationStatsProvider = mock(BiFunction.class); + when(mockReplicationStatsProvider.apply(any(), any())) + .thenReturn(new ReplicationStats(800, 800, 500)); indexShard = new IndexShard( routing, indexSettings, @@ -717,7 +721,8 @@ protected IndexShard newShard( DefaultRecoverySettings.INSTANCE, DefaultRemoteStoreSettings.INSTANCE, false, - discoveryNodes + discoveryNodes, + mockReplicationStatsProvider ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { From 5d138e370e79e5a85f7abdaed7f281864072ee89 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Sat, 18 Jan 2025 19:11:54 -0800 Subject: [PATCH 2/9] Updated style checks in the test Signed-off-by: Vinay Krishna Pudyodu --- .../opensearch/index/shard/IndexShardTestCase.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 66fafc23b4f60..bb91ba258e803 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -37,8 +37,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.junit.Assert; -import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.admin.indices.flush.FlushRequest; @@ -162,6 +160,7 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import org.junit.Assert; import java.io.IOException; import java.nio.file.Path; @@ -184,6 +183,10 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.mockito.Mockito; + +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -192,8 +195,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; -import static org.opensearch.test.ClusterServiceUtils.createClusterService; /** * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, @@ -688,10 +689,9 @@ protected IndexShard newShard( } return new InternalTranslogFactory(); }; - //This is fine since we are not testing the node stats now + // This is fine since we are not testing the node stats now BiFunction mockReplicationStatsProvider = mock(BiFunction.class); - when(mockReplicationStatsProvider.apply(any(), any())) - .thenReturn(new ReplicationStats(800, 800, 500)); + when(mockReplicationStatsProvider.apply(any(), any())).thenReturn(new ReplicationStats(800, 800, 500)); indexShard = new IndexShard( routing, indexSettings, From 18664d2a8fb6ab113e20300d05e78d27aa8a6b2d Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Sat, 18 Jan 2025 19:13:52 -0800 Subject: [PATCH 3/9] Updated changelog Signed-off-by: Vinay Krishna Pudyodu --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 45bc56b505eb3..0fbf6ba3bffa3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introduce a setting to disable download of full cluster state from remote on term mismatch([#16798](https://github.com/opensearch-project/OpenSearch/pull/16798/)) - Added ability to retrieve value from DocValues in a flat_object filed([#16802](https://github.com/opensearch-project/OpenSearch/pull/16802)) - Introduce framework for auxiliary transports and an experimental gRPC transport plugin ([#16534](https://github.com/opensearch-project/OpenSearch/pull/16534)) +- Implemented computation of segment replication stats at shard level ([#17055](https://github.com/opensearch-project/OpenSearch/pull/17055)) ### Dependencies - Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504)) From 346816497811e0262a8323a7c6cc8dd68b4f537f Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Sat, 18 Jan 2025 19:34:25 -0800 Subject: [PATCH 4/9] fixed style issues Signed-off-by: Vinay Krishna Pudyodu --- .../opensearch/indices/replication/SegmentReplicatorTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java index eda2ef58f4265..83895800bd05c 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java @@ -286,7 +286,6 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationPrimaryRefresh segmentReplicator.updatePrimaryLastRefreshedCheckpoint(thirdReplicationCheckpoint, shardId); ReplicationStats replicationStats = segmentReplicator.getSegmentReplicationStats(shardId, firstReplicationCheckpoint); - System.out.println(replicationStats); assertEquals(1200, replicationStats.totalBytesBehind); assertEquals(1200, replicationStats.maxBytesBehind); // Since we use System.currentTimeMillis() directly inside the method, actual value will vary From 4e693a5db9a4367ea7c97017b35bd8ecdca5af13 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Tue, 21 Jan 2025 10:47:50 -0800 Subject: [PATCH 5/9] Fix the failing integration test Signed-off-by: Vinay Krishna Pudyodu --- .../indices/replication/SegmentReplicatorTests.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java index 83895800bd05c..63b5bbd94e503 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Version; +import org.mockito.Mockito; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -46,8 +47,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import org.mockito.Mockito; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -242,8 +241,8 @@ public void testGetSegmentReplicationStats_WhenOnGoingReplication() { ReplicationStats replicationStats = segmentReplicator.getSegmentReplicationStats(shardId, firstReplicationCheckpoint); assertEquals(1000, replicationStats.totalBytesBehind); assertEquals(1000, replicationStats.maxBytesBehind); - // Since we use System.currentTimeMillis() directly inside the method, actual value will vary - assertTrue(replicationStats.maxReplicationLag > 0); + // Since we use System.currentTimeMillis() directly inside the getSegmentReplicationStats method, actual value will vary + // Although there is a way to mock the Clock skipping it here for the simplicity } public void testGetSegmentReplicationStats_WhileOnGoingReplicationPrimaryRefreshedToNewCheckPoint() { @@ -288,8 +287,8 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationPrimaryRefresh ReplicationStats replicationStats = segmentReplicator.getSegmentReplicationStats(shardId, firstReplicationCheckpoint); assertEquals(1200, replicationStats.totalBytesBehind); assertEquals(1200, replicationStats.maxBytesBehind); - // Since we use System.currentTimeMillis() directly inside the method, actual value will vary - assertTrue(replicationStats.maxReplicationLag > 0); + // Since we use System.currentTimeMillis() directly inside the getSegmentReplicationStats method, actual value will vary + // Although there is a way to mock the Clock skipping it here for the simplicity } protected void resolveCheckpointListener(ActionListener listener, IndexShard primary) { From 5a3d1efa0a0fcc667adc50d941c53a75d30ae17b Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Tue, 21 Jan 2025 10:52:33 -0800 Subject: [PATCH 6/9] Fix stylecheck Signed-off-by: Vinay Krishna Pudyodu --- .../opensearch/indices/replication/SegmentReplicatorTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java index 63b5bbd94e503..8e4b59a7d869f 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java @@ -11,7 +11,6 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Version; -import org.mockito.Mockito; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -47,6 +46,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import org.mockito.Mockito; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; From a94240f61ab4cc3d61257d6ea74226b203a0a62c Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Tue, 21 Jan 2025 12:08:22 -0800 Subject: [PATCH 7/9] Fixed the comments for the initial revision Signed-off-by: Vinay Krishna Pudyodu --- .../replication/SegmentReplicationTarget.java | 4 ---- .../indices/replication/SegmentReplicator.java | 18 +++++++++++------- .../checkpoint/ReplicationCheckpoint.java | 10 ++++++++-- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 8dfc4f45bc3c9..0c75389a8eb70 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -141,10 +141,6 @@ public ReplicationCheckpoint getCheckpoint() { return this.checkpoint; } - public SegmentReplicationSource getSource() { - return source; - } - @Override public void writeFileChunk( StoreFileMetadata metadata, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java index c540fa37607eb..d7666d9a100ed 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java @@ -13,7 +13,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.opensearch.OpenSearchCorruptionException; -import org.opensearch.action.StepListener; import org.opensearch.common.SetOnce; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; @@ -226,12 +225,17 @@ private void fetchPrimaryLastRefreshedCheckpoint(SegmentReplicationTarget target return; } - final StepListener checkpointInfoListener = new StepListener<>(); - target.getSource().getCheckpointMetadata(target.getId(), target.getCheckpoint(), checkpointInfoListener); - checkpointInfoListener.whenComplete( - checkpointInfo -> updatePrimaryLastRefreshedCheckpoint(checkpointInfo.getCheckpoint(), target.indexShard().shardId()), - checkpointInfoListener::onFailure - ); + sourceFactory.get().get(target.indexShard()).getCheckpointMetadata(target.getId(), target.getCheckpoint(), new ActionListener<>() { + @Override + public void onResponse(CheckpointInfoResponse checkpointInfoResponse) { + updatePrimaryLastRefreshedCheckpoint(checkpointInfoResponse.getCheckpoint(), target.indexShard().shardId()); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to fetch primary last refreshed checkpoint", e); + } + }); } private boolean isStoreCorrupt(SegmentReplicationTarget target) { diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 009678b7502f8..af8c289214e47 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -100,7 +100,11 @@ public ReplicationCheckpoint(StreamInput in) throws IOException { } else { this.metadataMap = Collections.emptyMap(); } - this.createdTimeStamp = in.readLong(); + if (in.getVersion().onOrAfter(Version.V_2_19_0)) { + this.createdTimeStamp = in.readLong(); + } else { + this.createdTimeStamp = 0; + } } /** @@ -164,7 +168,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_10_0)) { out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); } - out.writeLong(createdTimeStamp); + if (out.getVersion().onOrAfter(Version.V_2_19_0)) { + out.writeLong(createdTimeStamp); + } } @Override From dd0406d60c7868ee626699296bd5de7d5b73497c Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Tue, 21 Jan 2025 12:23:52 -0800 Subject: [PATCH 8/9] Updated to use System.nanoTime() for lag calculation Signed-off-by: Vinay Krishna Pudyodu --- .../main/java/org/opensearch/index/shard/IndexShard.java | 2 +- .../opensearch/indices/replication/SegmentReplicator.java | 3 ++- .../replication/checkpoint/ReplicationCheckpoint.java | 4 ++-- .../indices/replication/SegmentReplicatorTests.java | 6 +++--- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ca3aefd423984..e931d7944ffe3 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1788,7 +1788,7 @@ ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) th metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(), getEngine().config().getCodec().getName(), metadataMap, - System.currentTimeMillis() + System.nanoTime() ); logger.trace("Recomputed ReplicationCheckpoint for shard {}", checkpoint); return checkpoint; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java index d7666d9a100ed..1a7fafc4a05cd 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * This class is responsible for managing segment replication events on replicas. @@ -124,7 +125,7 @@ public ReplicationStats getSegmentReplicationStats(ShardId shardId, ReplicationC final ReplicationCheckpoint lastOnGoingReplicationCheckpoint = this.lastOnGoingReplicationCheckpoint.get(shardId); final long replicationLag = lastOnGoingReplicationCheckpoint != null - ? System.currentTimeMillis() - lastOnGoingReplicationCheckpoint.getCreatedTimeStamp() + ? TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastOnGoingReplicationCheckpoint.getCreatedTimeStamp()) : 0; return new ReplicationStats(bytesBehindSum, bytesBehindSum, bytesBehindSum > 0L ? replicationLag : 0); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index af8c289214e47..0fc5c0d42d4e2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -56,11 +56,11 @@ private ReplicationCheckpoint(ShardId shardId, String codec) { length = 0L; this.codec = codec; this.metadataMap = Collections.emptyMap(); - this.createdTimeStamp = System.currentTimeMillis(); + this.createdTimeStamp = System.nanoTime(); } public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) { - this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap(), System.currentTimeMillis()); + this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap(), System.nanoTime()); } public ReplicationCheckpoint( diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java index 8e4b59a7d869f..fee5cc90f4df3 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java @@ -234,7 +234,7 @@ public void testGetSegmentReplicationStats_WhenOnGoingReplication() { 1000, "", stringStoreFileMetadataMap, - System.currentTimeMillis() + System.nanoTime() ); segmentReplicator.updateReplicationCheckpoints(secondReplicationCheckpoint, shardId); @@ -265,7 +265,7 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationPrimaryRefresh 1000, "", stringStoreFileMetadataMap, - System.currentTimeMillis() + System.nanoTime() ); StoreFileMetadata storeFileMetadata3 = new StoreFileMetadata("test-3", 200, "1", Version.LATEST, bytesRef); @@ -279,7 +279,7 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationPrimaryRefresh 1200, "", stringStoreFileMetadataMap, - System.currentTimeMillis() + System.nanoTime() ); segmentReplicator.updateReplicationCheckpoints(secondReplicationCheckpoint, shardId); From 1104c1f294ddbd62215e62884a01cb2b4bee41a8 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Tue, 21 Jan 2025 22:33:21 -0800 Subject: [PATCH 9/9] Fixed the integration test for node stats Signed-off-by: Vinay Krishna Pudyodu --- .../replication/SegmentReplicationStatsIT.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index 89aef6f0be1a6..f5abc66ab90a1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -404,19 +404,17 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception { for (NodeStats nodeStats : nodesStatsResponse.getNodes()) { ReplicationStats replicationStats = nodeStats.getIndices().getSegments().getReplicationStats(); - // primary node - should hold replication statistics + // primary node - do not have any replication statistics if (nodeStats.getNode().getName().equals(primaryNode)) { - assertTrue(replicationStats.getMaxBytesBehind() > 0); - assertTrue(replicationStats.getTotalBytesBehind() > 0); - assertTrue(replicationStats.getMaxReplicationLag() > 0); - // 2 replicas so total bytes should be double of max - assertEquals(replicationStats.getMaxBytesBehind() * 2, replicationStats.getTotalBytesBehind()); + assertTrue(replicationStats.getMaxBytesBehind() == 0); + assertTrue(replicationStats.getTotalBytesBehind() == 0); + assertTrue(replicationStats.getMaxReplicationLag() == 0); } // replica nodes - should hold empty replication statistics if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) { - assertEquals(0, replicationStats.getMaxBytesBehind()); - assertEquals(0, replicationStats.getTotalBytesBehind()); - assertEquals(0, replicationStats.getMaxReplicationLag()); + assertTrue(replicationStats.getMaxBytesBehind() > 0); + assertTrue(replicationStats.getTotalBytesBehind() > 0); + assertTrue(replicationStats.getMaxReplicationLag() > 0); } } // get replication statistics at index level