From 7799b6440ff74613fb2c8bd633ed5effc1b18034 Mon Sep 17 00:00:00 2001 From: Shailendra Singh Date: Sun, 9 Jun 2024 10:09:35 +0530 Subject: [PATCH] Delete stale index routing files. Signed-off-by: Shailendra Singh --- .../DefaultRemoteRoutingTableService.java | 3 + .../InternalRemoteRoutingTableService.java | 13 +++ .../remote/NoopRemoteRoutingTableService.java | 5 ++ .../remote/RemoteRoutingTableService.java | 2 + .../RemoteClusterStateCleanupManager.java | 30 ++++++- .../remote/RemoteClusterStateService.java | 1 - .../RemoteRoutingTableServiceTests.java | 32 +++++++ ...RemoteClusterStateCleanupManagerTests.java | 88 ++++++++++++++++++- 8 files changed, 168 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/remote/DefaultRemoteRoutingTableService.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/DefaultRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/DefaultRemoteRoutingTableService.java new file mode 100644 index 0000000000000..04b7e42c68370 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/DefaultRemoteRoutingTableService.java @@ -0,0 +1,3 @@ +package org.opensearch.cluster.routing.remote; + +public class DefaultRemoteRoutingTableService {} diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index dcf106914c571..c0ea776c8101c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -297,6 +297,19 @@ protected void doStart() { blobStoreRepository = (BlobStoreRepository) repository; } + @Override + public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException { + try { + System.out.println("Deleting stale index routing files from remote - " + stalePaths); + logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths); + + blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths); + } catch (IOException e) { + logger.error("Error while deleting stale index routing files", e); + throw e; + } + } + @Override protected void doStop() {} diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index b52c00f1f8576..95688c55c6519 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -74,4 +74,9 @@ protected void doStop() { protected void doClose() throws IOException { // noop } + + @Override + public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException { + // noop + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index dbf01904116ed..b387be17b2d44 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -61,4 +61,6 @@ List getAllUploadedIndicesRouting List indicesRoutingToDelete ); + void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException; + } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java index 2fca239b10efd..39bc28fcce4ab 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Strings; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobMetadata; @@ -31,6 +32,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,8 +76,13 @@ public class RemoteClusterStateCleanupManager implements Closeable { private long lastCleanupAttemptStateVersion; private final ThreadPool threadpool; private final ClusterApplierService clusterApplierService; + private final Optional remoteRoutingTableService; - public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) { + public RemoteClusterStateCleanupManager( + RemoteClusterStateService remoteClusterStateService, + ClusterService clusterService, + Optional remoteRoutingTableService + ) { this.remoteClusterStateService = remoteClusterStateService; this.remoteStateStats = remoteClusterStateService.getStats(); ClusterSettings clusterSettings = clusterService.getClusterSettings(); @@ -84,6 +91,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS this.threadpool = remoteClusterStateService.getThreadpool(); // initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold this.lastCleanupAttemptStateVersion = 0; + this.remoteRoutingTableService = remoteRoutingTableService; clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval); } @@ -171,6 +179,7 @@ void deleteClusterMetadata( Set staleManifestPaths = new HashSet<>(); Set staleIndexMetadataPaths = new HashSet<>(); Set staleGlobalMetadataPaths = new HashSet<>(); + Set staleIndexRoutingPaths = new HashSet<>(); activeManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( clusterName, @@ -189,6 +198,10 @@ void deleteClusterMetadata( .values() .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename())); } + if (remoteRoutingTableService.isPresent() && clusterMetadataManifest.getIndicesRouting() != null) { + clusterMetadataManifest.getIndicesRouting() + .forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename())); + } }); staleManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( @@ -221,6 +234,14 @@ void deleteClusterMetadata( attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths) ); } + if (remoteRoutingTableService.isPresent() && clusterMetadataManifest.getIndicesRouting() != null) { + clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> { + if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) { + staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename()); + logger.debug(() -> "Indices routing paths in stale manifest: " + uploadedIndicesRouting.getUploadedFilename()); + } + }); + } clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { @@ -240,6 +261,13 @@ void deleteClusterMetadata( deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); + remoteRoutingTableService.ifPresent(remoteroutingTableService -> { + try { + remoteroutingTableService.deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } catch (IllegalStateException e) { logger.error("Error while fetching Remote Cluster Metadata manifests", e); } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 1617efd6200c3..8ea6b97e27eaf 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -264,7 +264,6 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); - this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(repositoriesService, settings, clusterSettings); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 8fd410e774332..e3a9fcbd289c6 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -47,12 +47,14 @@ import org.junit.Before; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX; import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; @@ -65,6 +67,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.startsWith; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -74,6 +77,9 @@ public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { private InternalRemoteRoutingTableService remoteRoutingTableService; + // private RemoteRoutingTableService remoteRoutingTableService; + + private ClusterSettings clusterSettings; private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; @@ -92,6 +98,8 @@ public void setup() { .put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false) .build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + blobStoreRepository = mock(BlobStoreRepository.class); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); blobStore = mock(BlobStore.class); @@ -99,6 +107,7 @@ public void setup() { when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); when(blobStoreRepository.blobStore()).thenReturn(blobStore); + when(blobStoreRepository.blobStore()).thenReturn(blobStore); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); @@ -552,4 +561,27 @@ private BlobPath getPath() { RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64 ); } + + public void testDeleteStaleIndexRoutingPaths() throws IOException { + doNothing().when(blobContainer).deleteBlobsIgnoringIfNotExists(any()); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + List stalePaths = Arrays.asList("path1", "path2"); + remoteRoutingTableService.doStart(); + remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths); + verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths); + } + + public void testDeleteStaleIndexRoutingPathsThrowsIOException() throws IOException { + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + List stalePaths = Arrays.asList("path1", "path2"); + // Simulate an IOException + doThrow(new IOException("test exception")).when(blobContainer).deleteBlobsIgnoringIfNotExists(Mockito.anyList()); + + remoteRoutingTableService.doStart(); + IOException thrown = assertThrows(IOException.class, () -> { + remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths); + }); + assertEquals("test exception", thrown.getMessage()); + verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths); + } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java index 24fd1b164a4ff..8ad327e5415f5 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java @@ -12,6 +12,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobContainer; @@ -39,6 +40,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -47,6 +49,7 @@ import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion; @@ -76,6 +79,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -92,6 +96,9 @@ public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase { private ClusterState clusterState; private Metadata metadata; private RemoteClusterStateService remoteClusterStateService; + private Optional remoteRoutingTableService; + + private RemoteRoutingTableService mockedRemoteRoutingTableService; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -139,7 +146,14 @@ public void setup() { when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats()); when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool); when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); - remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService); + mockedRemoteRoutingTableService = mock(RemoteRoutingTableService.class); + remoteRoutingTableService = Optional.of(mockedRemoteRoutingTableService); + when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager( + remoteClusterStateService, + clusterService, + remoteRoutingTableService + ); } @After @@ -155,11 +169,13 @@ public void testDeleteClusterMetadata() throws IOException { List inactiveBlobs = Arrays.asList( new PlainBlobMetadata("manifest1.dat", 1L), new PlainBlobMetadata("manifest2.dat", 1L), - new PlainBlobMetadata("manifest3.dat", 1L) + new PlainBlobMetadata("manifest3.dat", 1L), + new PlainBlobMetadata("manifest6.dat", 1L) ); List activeBlobs = Arrays.asList( new PlainBlobMetadata("manifest4.dat", 1L), - new PlainBlobMetadata("manifest5.dat", 1L) + new PlainBlobMetadata("manifest5.dat", 1L), + new PlainBlobMetadata("manifest7.dat", 1L) ); UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1"); UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2"); @@ -199,6 +215,45 @@ public void testDeleteClusterMetadata() throws IOException { .settingMetadata(settingMetadataUpdated) .build(); + UploadedIndexMetadata index3Metadata = new UploadedIndexMetadata("index3", "indexUUID3", "index_metadata3"); + UploadedIndexMetadata index4Metadata = new UploadedIndexMetadata("index4", "indexUUID4", "index_metadata4"); + List indicesRouting1 = List.of(index3Metadata, index4Metadata); + List indicesRouting2 = List.of(index4Metadata); + ClusterMetadataManifest manifest6 = ClusterMetadataManifest.builder() + .indices(List.of(index1Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadataUpdated) + .settingMetadata(settingMetadataUpdated) + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting1) + .build(); + ClusterMetadataManifest manifest7 = ClusterMetadataManifest.builder() + .indices(List.of(index2Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadataUpdated) + .settingMetadata(settingMetadataUpdated) + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting2) + .build(); + // active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3) .coordinationMetadata(coordinationMetadataUpdated) @@ -208,10 +263,13 @@ public void testDeleteClusterMetadata() throws IOException { when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn( manifest4, manifest5, + manifest7, manifest1, manifest2, - manifest3 + manifest3, + manifest6 ); + BlobContainer container = mock(BlobContainer.class); when(blobStore.blobContainer(any())).thenReturn(container); doNothing().when(container).deleteBlobsIgnoringIfNotExists(any()); @@ -231,9 +289,31 @@ public void testDeleteClusterMetadata() throws IOException { + ".dat" ) ); + + verify(mockedRemoteRoutingTableService).deleteStaleIndexRoutingPaths(List.of(index3Metadata.getUploadedFilename())); Set staleManifest = new HashSet<>(); inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name())); verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest)); + + // Test case when remoteRoutingTableService is null + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterState.getClusterName()).thenReturn(new ClusterName("test")); + when(metadata.clusterUUID()).thenReturn("testUUID"); + when(clusterState.metadata()).thenReturn(metadata); + when(clusterApplierService.state()).thenReturn(clusterState); + when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); + + mockedRemoteRoutingTableService = mock(RemoteRoutingTableService.class); + remoteRoutingTableService = Optional.of(mockedRemoteRoutingTableService); + + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager( + remoteClusterStateService, + clusterService, + Optional.empty() + ); + remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs); + verify(mockedRemoteRoutingTableService, never()).deleteStaleIndexRoutingPaths(any()); } public void testDeleteStaleClusterUUIDs() throws IOException {