diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 3a1ab73b398e3..c949ffffdf656 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -56,10 +56,6 @@ */ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService { - public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing"; - public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; - public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; - private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class); private final Settings settings; private final Supplier repositoriesService; @@ -126,14 +122,13 @@ public DiffableUtils.MapDiff getAsyncIndexRoutingWriteAction( ClusterState clusterState, - String clusterUUID, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener ) { RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( indexRouting, - clusterUUID, + clusterState.metadata().clusterUUID(), compressor, clusterState.term(), clusterState.version() @@ -177,7 +172,6 @@ public List getAllUploadedIndices public CheckedRunnable getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ) { @@ -186,7 +180,7 @@ public CheckedRunnable getAsyncIndexRoutingReadAction( latchedActionListener::onFailure ); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, index, clusterUUID, compressor); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor); return () -> remoteWritableEntityStore.readAsync(remoteIndexRoutingTable, actionListener); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index fde4f381b3e8a..b4c3411c6e451 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -43,7 +43,6 @@ public DiffableUtils.MapDiff getAsyncIndexRoutingWriteAction( ClusterState clusterState, - String clusterUUID, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener ) { @@ -65,7 +64,6 @@ public List getAllUploadedIndices public CheckedRunnable getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ) { // noop diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 823487132c132..cd40bb71cecca 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -48,7 +48,6 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException { CheckedRunnable getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ); @@ -64,7 +63,6 @@ DiffableUtils.MapDiff> CheckedRunnable getAsyncIndexRoutingWriteAction( ClusterState clusterState, - String clusterUUID, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener ); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d6edfdc25b7bf..06b69ff596b3a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -106,6 +106,8 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -656,10 +658,9 @@ private UploadedMetadataResults writeMetadataInParallel( }); indicesRoutingToUpload.forEach(indexRoutingTable -> { uploadTasks.put( - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), + String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()), remoteRoutingTableService.getAsyncIndexRoutingWriteAction( clusterState, - clusterState.metadata().clusterUUID(), indexRoutingTable, listener ) @@ -712,7 +713,7 @@ private UploadedMetadataResults writeMetadataInParallel( UploadedMetadataResults response = new UploadedMetadataResults(); results.forEach((name, uploadedMetadata) -> { if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class) - && uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX)) { + && uploadedMetadata.getComponent().contains(INDEX_ROUTING_TABLE_PREFIX)) { response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata); } else if (name.startsWith(CUSTOM_METADATA)) { // component name for custom metadata will look like custom-- @@ -1037,7 +1038,6 @@ private ClusterState readClusterStateInParallel( remoteRoutingTableService.getAsyncIndexRoutingReadAction( clusterUUID, indexRouting.getUploadedFilename(), - new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), routingTableLatchedActionListener ) ); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java index 17354d16c1b0f..ecadb3159bb9f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java @@ -21,11 +21,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; /** * Extends the RemoteClusterStateBlobStore to support {@link RemoteIndexRoutingTable} @@ -60,7 +56,6 @@ public class RemoteRoutingTableBlobStore { - public static final String INDEX_ROUTING_TABLE = "index_routing_table"; + public static final String INDEX_ROUTING_TABLE = "index-routing"; + public static final String INDEX_ROUTING_TABLE_PREFIX = "index-routing--"; private IndexRoutingTable indexRoutingTable; private final Index index; private long term; @@ -58,17 +58,17 @@ public RemoteIndexRoutingTable( this.indexRoutingTable = indexRoutingTable; this.term = term; this.version = version; - this.blobFileName = generateBlobFileName(); } /** * Reads data from inputStream and creates RemoteIndexRoutingTable object with the {@link IndexRoutingTable} * @param blobName name of the blob, which contains the index routing data - * @param index index for the current routing data + * @param clusterUUID UUID of the cluster + * @param compressor Compressor object */ - public RemoteIndexRoutingTable(String blobName, Index index, String clusterUUID, Compressor compressor) { + public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor) { super(clusterUUID, compressor, null); - this.index = index; + this.index = null; this.term = -1; this.version = -1; this.blobName = blobName; @@ -96,7 +96,7 @@ public String getType() { public String generateBlobFileName() { return String.join( DELIMITER, - INDEX_ROUTING_FILE_PREFIX, + INDEX_ROUTING_TABLE_PREFIX, RemoteStoreUtils.invertLong(term), RemoteStoreUtils.invertLong(version), RemoteStoreUtils.invertLong(System.currentTimeMillis()) @@ -105,7 +105,9 @@ public String generateBlobFileName() { @Override public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { - return new ClusterMetadataManifest.UploadedIndexMetadata(index.getName(), index.getUUID(), blobName, INDEX_ROUTING_METADATA_PREFIX); + assert blobName != null; + assert index != null; + return new ClusterMetadataManifest.UploadedIndexMetadata(index.getName(), index.getUUID(), blobName, INDEX_ROUTING_TABLE_PREFIX); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java index 39294ee8da41e..cb02995d56ce9 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java @@ -11,7 +11,11 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -20,6 +24,7 @@ import java.util.function.Supplier; +import static org.mockito.Mockito.mock; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -36,11 +41,18 @@ public void teardown() throws Exception { public void testGetServiceWhenRemoteRoutingDisabled() { Settings settings = Settings.builder().build(); + BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); + Compressor compressor = new NoneCompressor(); + BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ); assertTrue(service instanceof NoopRemoteRoutingTableService); } @@ -50,13 +62,20 @@ public void testGetServiceWhenRemoteRoutingEnabled() { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false) .build(); + BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); + Compressor compressor = new NoneCompressor(); + BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ); assertTrue(service instanceof InternalRemoteRoutingTableService); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 839ebe1ff8301..af31545e0a331 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -33,6 +33,8 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteStateTransferException; @@ -40,6 +42,7 @@ import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; @@ -62,11 +65,11 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX; -import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.remote.ClusterMetadataManifestTests.randomUploadedIndexMetadataList; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -92,6 +95,8 @@ public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { private BlobPath basePath; private ClusterSettings clusterSettings; private ClusterService clusterService; + private Compressor compressor; + private BlobStoreTransferService blobStoreTransferService; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -105,6 +110,7 @@ public void setup() { .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterService = mock(ClusterService.class); + blobStoreTransferService = mock(BlobStoreTransferService.class); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); blobStoreRepository = mock(BlobStoreRepository.class); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); @@ -114,14 +120,18 @@ public void setup() { when(blobStoreRepository.blobStore()).thenReturn(blobStore); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); - + compressor = new NoneCompressor(); basePath = BlobPath.cleanPath().add("base-path"); remoteRoutingTableService = new InternalRemoteRoutingTableService( repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ); } @@ -141,7 +151,11 @@ public void testFailInitializationWhenRemoteRoutingDisabled() { repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ) ); } @@ -356,18 +370,17 @@ public void testGetIndexRoutingAsyncAction() throws IOException { when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingWriteAction( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath + listener ); assertNotNull(runnable); runnable.run(); String expectedFilePrefix = String.join( DELIMITER, - INDEX_ROUTING_FILE_PREFIX, + INDEX_ROUTING_TABLE_PREFIX, RemoteStoreUtils.invertLong(clusterState.term()), RemoteStoreUtils.invertLong(clusterState.version()) ); @@ -385,17 +398,16 @@ public void testGetIndexRoutingAsyncActionFailureInBlobRepo() throws IOException doThrow(new IOException("testing failure")).when(blobContainer).writeBlob(anyString(), any(StreamInput.class), anyLong(), eq(true)); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingWriteAction( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath + listener ); assertNotNull(runnable); runnable.run(); String expectedFilePrefix = String.join( DELIMITER, - INDEX_ROUTING_FILE_PREFIX, + INDEX_ROUTING_TABLE_PREFIX, RemoteStoreUtils.invertLong(clusterState.term()), RemoteStoreUtils.invertLong(clusterState.version()) ); @@ -424,18 +436,17 @@ public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { .asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingWriteAction( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath + listener ); assertNotNull(runnable); runnable.run(); String expectedFilePrefix = String.join( DELIMITER, - INDEX_ROUTING_FILE_PREFIX, + INDEX_ROUTING_TABLE_PREFIX, RemoteStoreUtils.invertLong(clusterState.term()), RemoteStoreUtils.invertLong(clusterState.version()) ); @@ -459,11 +470,10 @@ public void testGetIndexRoutingAsyncActionAsyncRepoFailureInRepo() throws IOExce .asyncBlobUpload(any(WriteContext.class), any(ActionListener.class)); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingWriteAction( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath + listener ); assertNotNull(runnable); runnable.run(); @@ -476,7 +486,7 @@ public void testGetAllUploadedIndicesRouting() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); List allIndiceRoutingMetadata = remoteRoutingTableService @@ -491,7 +501,7 @@ public void testGetAllUploadedIndicesRoutingExistingIndexInManifest() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata)) @@ -509,7 +519,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata)) @@ -518,7 +528,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() { "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); List allIndiceRoutingMetadata = remoteRoutingTableService @@ -534,13 +544,13 @@ public void testGetAllUploadedIndicesRoutingIndexDeleted() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) @@ -558,13 +568,13 @@ public void testGetAllUploadedIndicesRoutingNoChange() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) @@ -650,13 +660,16 @@ public void testGetAsyncIndexMetadataReadAction() throws Exception { when(blobStore.blobContainer(any())).thenReturn(blobContainer); BytesStreamOutput streamOutput = new BytesStreamOutput(); RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - clusterState.routingTable().getIndicesRouting().get(indexName) + clusterState.routingTable().getIndicesRouting().get(indexName), + "cluster-uuid", + compressor, + 3L,2L ); - remoteIndexRoutingTable.writeTo(streamOutput); - when(blobContainer.readBlob(indexName)).thenReturn(streamOutput.bytes().streamInput()); + when(blobContainer.readBlob(indexName)).thenReturn(remoteIndexRoutingTable.serialize()); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction( + "cluster-uuid", uploadedFileName, listener); assertNotNull(runnable); runnable.run(); @@ -674,13 +687,16 @@ public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws when(blobStore.blobContainer(any())).thenReturn(blobContainer); BytesStreamOutput streamOutput = new BytesStreamOutput(); RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - clusterState.routingTable().getIndicesRouting().get(indexName) + clusterState.routingTable().getIndicesRouting().get(indexName), + "cluster-uuid", + compressor, + 3L,2L ); - remoteIndexRoutingTable.writeTo(streamOutput); - when(blobContainer.readBlob(anyString())).thenReturn(streamOutput.bytes().streamInput()); + when(blobContainer.readBlob(anyString())).thenReturn(remoteIndexRoutingTable.serialize()); remoteRoutingTableService.doStart(); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction( + "cluster-uuid", uploadedFileName, listener); assertNotNull(runnable); runnable.run(); @@ -691,14 +707,13 @@ public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws public void testGetAsyncIndexMetadataReadActionFailureInBlobRepo() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - Index index = new Index(indexName, "uuid-01"); LatchedActionListener listener = mock(LatchedActionListener.class); when(blobStore.blobContainer(any())).thenReturn(blobContainer); doThrow(new IOException("testing failure")).when(blobContainer).readBlob(indexName); remoteRoutingTableService.doStart(); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction("cluster-uuid", uploadedFileName, listener); assertNotNull(runnable); runnable.run(); @@ -758,7 +773,7 @@ private ClusterState createClusterState(String indexName) { } private BlobPath getPath() { - BlobPath indexRoutingPath = basePath.add(INDEX_ROUTING_PATH_TOKEN); + BlobPath indexRoutingPath = basePath.add(INDEX_ROUTING_TABLE); return RemoteStoreEnums.PathType.HASHED_PREFIX.path( RemoteStorePathStrategy.PathInput.builder().basePath(indexRoutingPath).indexUUID("uuid").build(), RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64 diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 02471c9cdbbbe..af09bb0743df1 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -44,6 +44,7 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; public class ClusterMetadataManifestTests extends OpenSearchTestCase { @@ -545,7 +546,7 @@ public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOExc "test-index", "test-uuid", "routing-path", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() .clusterTerm(1L) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index c8fd982fec1e1..46dc38378a72b 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -96,6 +96,7 @@ import static java.util.stream.Collectors.toList; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getFormattedIndexFileName; @@ -107,6 +108,7 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA_FORMAT; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -1426,7 +1428,7 @@ public void testWriteFullMetadataSuccessWithRoutingTable() throws IOException { "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE + CUSTOM_DELIMITER ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(List.of(uploadedIndexMetadata)) @@ -1477,7 +1479,7 @@ public void testWriteFullMetadataInParallelSuccessWithRoutingTable() throws IOEx "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE + CUSTOM_DELIMITER ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() @@ -1532,7 +1534,7 @@ public void testWriteIncrementalMetadataSuccessWithRoutingTable() throws IOExcep "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE + CUSTOM_DELIMITER ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(List.of(uploadedIndexMetadata)) diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java deleted file mode 100644 index a3f0ac36a40f1..0000000000000 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote.routingtable; - -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.test.OpenSearchTestCase; - -import java.io.IOException; - -public class IndexRoutingTableHeaderTests extends OpenSearchTestCase { - - public void testIndexRoutingTableHeader() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - IndexRoutingTableHeader header = new IndexRoutingTableHeader(indexName); - try (BytesStreamOutput out = new BytesStreamOutput()) { - header.writeTo(out); - - BytesStreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes); - IndexRoutingTableHeader headerRead = new IndexRoutingTableHeader(in); - assertEquals(indexName, headerRead.getIndexName()); - - } - } - -} diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java index 72066d8afb45b..49acca9a7e5f6 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java @@ -8,21 +8,156 @@ package org.opensearch.gateway.remote.routingtable; +import org.junit.After; +import org.junit.Before; import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.SnapshotsInProgress; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.io.InputStream; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; + public class RemoteIndexRoutingTableTests extends OpenSearchTestCase { - public void testRoutingTableInput() { + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final String INDEX_ROUTING_TABLE_TYPE = "test-index-routing-table"; + private static final long STATE_VERSION = 3L; + private static final long STATE_TERM = 2L; + private String clusterUUID; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private String clusterName; + private ClusterSettings clusterSettings; + private Compressor compressor; + private NamedWriteableRegistry namedWriteableRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + compressor = new NoneCompressor(); + namedWriteableRegistry = writableRegistry(); + this.clusterName = "test-cluster-name"; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertEquals(remoteObjectForUpload.clusterUUID(), clusterUUID); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + TEST_BLOB_NAME, + clusterUUID, + compressor + ); + assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID); + }); + } + + public void testFullBlobName() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + TEST_BLOB_NAME, + clusterUUID, + compressor + ); + assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + }); + } + + public void testBlobFileName() { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); int numberOfShards = randomIntBetween(1, 10); int numberOfReplicas = randomIntBetween(1, 10); @@ -37,51 +172,172 @@ public void testRoutingTableInput() { RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); - initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { - RemoteIndexRoutingTable indexRouting = new RemoteIndexRoutingTable(indexShardRoutingTables); - try (BytesStreamOutput streamOutput = new BytesStreamOutput();) { - indexRouting.writeTo(streamOutput); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - streamOutput.bytes().streamInput(), - metadata.index(indexName).getIndex() - ); - IndexRoutingTable indexRoutingTable = remoteIndexRoutingTable.getIndexRoutingTable(); - assertEquals(numberOfShards, indexRoutingTable.getShards().size()); - assertEquals(metadata.index(indexName).getIndex(), indexRoutingTable.getIndex()); - assertEquals( - numberOfShards * (1 + numberOfReplicas), - indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED).size() - ); + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + TEST_BLOB_NAME, + clusterUUID, + compressor + ); + assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + }); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/routingTable"; + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + uploadedFile, + clusterUUID, + compressor + ); + assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "routingTable" })); + } + + public void testBlobPathParameters() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertThat(params.getPathTokens(), is(List.of(indexRoutingTable.getIndex().getUUID()))); + String expectedPrefix = ""; + assertThat(params.getFilePrefix(), is(expectedPrefix)); + }); + } + + public void testGenerateBlobFileName() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + assertEquals(nameTokens[0], INDEX_ROUTING_TABLE_PREFIX); + assertEquals(nameTokens[1], RemoteStoreUtils.invertLong(STATE_TERM)); + assertEquals(nameTokens[2], RemoteStoreUtils.invertLong(STATE_VERSION)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); + }); + } + + public void testGetUploadedMetadata() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + String expectedPrefix = String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()); + assertThat(uploadedMetadata.getComponent(), is(expectedPrefix)); + assertThat(uploadedMetadata.getUploadedFilename(), is(remoteObjectForUpload.getFullBlobName())); } catch (IOException e) { throw new RuntimeException(e); } }); } - public void testRoutingTableInputStreamWithInvalidIndex() { + public void testSerDe() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); Metadata metadata = Metadata.builder() - .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) - .put(IndexMetadata.builder("invalid-index").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) .build(); - RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); - AtomicInteger assertionError = new AtomicInteger(); - initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { - RemoteIndexRoutingTable indexRouting = new RemoteIndexRoutingTable(indexShardRoutingTables); - try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { - indexRouting.writeTo(streamOutput); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - streamOutput.bytes().streamInput(), - metadata.index("invalid-index").getIndex() - ); - } catch (AssertionError e) { - assertionError.getAndIncrement(); + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); + assertThat(inputStream.available(), greaterThan(0)); + IndexRoutingTable readIndexRoutingTable = remoteObjectForUpload.deserialize(inputStream); + assertEquals(readIndexRoutingTable, indexRoutingTable); } catch (IOException e) { throw new RuntimeException(e); } }); - - assertEquals(1, assertionError.get()); } - }