diff --git a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 26d86e9f..13d62151 100644 --- a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -82,7 +82,7 @@ public Collection createComponents( OperationalMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry); // create top n queries service final QueryInsightsService queryInsightsService = new QueryInsightsService( - clusterService.getClusterSettings(), + clusterService, threadPool, client, metricsRegistry, @@ -145,6 +145,7 @@ public List> getSettings() { QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE, QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING, + QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER, QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY ); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java index ef396bcc..de43867e 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java @@ -8,16 +8,26 @@ package org.opensearch.plugin.insights.core.exporter; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE; + +import java.time.Instant; +import java.time.LocalDate; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.bulk.BulkRequestBuilder; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; @@ -36,6 +46,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter { private final Logger logger = LogManager.getLogger(); private final Client client; private DateTimeFormatter indexPattern; + private int deleteAfter; /** * Constructor of LocalIndexExporter @@ -46,6 +57,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter { public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) { this.indexPattern = indexPattern; this.client = client; + this.deleteAfter = DEFAULT_DELETE_AFTER_VALUE; } /** @@ -61,11 +73,9 @@ public DateTimeFormatter getIndexPattern() { * Setter of indexPattern * * @param indexPattern index pattern - * @return the current LocalIndexExporter */ - public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) { + void setIndexPattern(DateTimeFormatter indexPattern) { this.indexPattern = indexPattern; - return this; } /** @@ -75,15 +85,15 @@ public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) { */ @Override public void export(final List records) { - if (records == null || records.size() == 0) { + if (records == null || records.isEmpty()) { return; } try { - final String index = getDateTimeFromFormat(); + final String indexName = buildLocalIndexName(); final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1)); for (SearchQueryRecord record : records) { bulkRequestBuilder.add( - new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + new IndexRequest(indexName).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) ); } bulkRequestBuilder.execute(new ActionListener() { @@ -110,7 +120,92 @@ public void close() { logger.debug("Closing the LocalIndexExporter.."); } - private String getDateTimeFromFormat() { - return indexPattern.format(ZonedDateTime.now(ZoneOffset.UTC)); + /** + * Builds the local index name using the current UTC datetime + * + * @return A string representing the index name in the format "top_queries-YYYY.MM.dd-01234". + */ + String buildLocalIndexName() { + return indexPattern.format(ZonedDateTime.now(ZoneOffset.UTC)) + "-" + generateLocalIndexDateHash(); + } + + /** + * Set local index exporter data retention period + * + * @param deleteAfter the number of days after which Top N local indices should be deleted + */ + public void setDeleteAfter(final int deleteAfter) { + this.deleteAfter = deleteAfter; + } + + /** + * Delete Top N local indices older than the configured data retention period + * + * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata} + */ + public void deleteExpiredIndices(final Map indexMetadataMap) { + long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter); + for (Map.Entry entry : indexMetadataMap.entrySet()) { + String indexName = entry.getKey(); + if (!isTopQueriesIndex(indexName)) { + continue; + } + if (entry.getValue().getCreationDate() <= expirationMillisLong) { + // delete this index + client.admin().indices().delete(new DeleteIndexRequest(indexName)); + } + } + } + + /** + * Validates if the input string is a Query Insights local index name + * in the format "top_queries-YYYY.MM.dd-XXXXX". + * + * @param indexName the string to validate. + * @return {@code true} if the string is valid, {@code false} otherwise. + */ + static boolean isTopQueriesIndex(String indexName) { + // Split the input string by '-' + String[] parts = indexName.split("-"); + + // Check if the string has exactly 3 parts + if (parts.length != 3) { + return false; + } + + // Validate the first part is "top_queries" + if (!"top_queries".equals(parts[0])) { + return false; + } + + // Validate the second part is a valid date in "YYYY.MM.dd" format + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd", Locale.ROOT); + try { + LocalDate.parse(parts[1], formatter); + } catch (DateTimeParseException e) { + return false; + } + + // Validate the third part is exactly 5 digits + if (!parts[2].matches("\\d{5}")) { + return false; + } + + return true; + } + + /** + * Generates a consistent 5-digit numeric hash based on the current UTC date. + * The generated hash is deterministic, meaning it will return the same result for the same date. + * + * @return A 5-digit numeric string representation of the current date's hash. + */ + public static String generateLocalIndexDateHash() { + // Get the current date in UTC (yyyy-MM-dd format) + String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT) + .format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate()); + + // Generate a 5-digit numeric hash from the date's hashCode + return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000); } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java b/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java index 626c09e9..75ede42a 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java +++ b/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java @@ -13,9 +13,9 @@ public enum OperationalMetric { LOCAL_INDEX_READER_PARSING_EXCEPTIONS("Number of errors when parsing with LocalIndexReader"), LOCAL_INDEX_EXPORTER_BULK_FAILURES("Number of failures when ingesting Query Insights data to local indices"), + LOCAL_INDEX_EXPORTER_DELETE_FAILURES("Number of failures when deleting local indices"), LOCAL_INDEX_EXPORTER_EXCEPTIONS("Number of exceptions in Query Insights LocalIndexExporter"), INVALID_EXPORTER_TYPE_FAILURES("Number of invalid exporter type failures"), - INVALID_INDEX_PATTERN_EXCEPTIONS("Number of invalid index pattern exceptions"), DATA_INGEST_EXCEPTIONS("Number of exceptions during data ingest in Query Insights"), QUERY_CATEGORIZE_EXCEPTIONS("Number of exceptions when categorizing the queries"), EXPORTER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the exporter"), diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java index df8ba7a6..f7e4b8e2 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java @@ -8,6 +8,8 @@ package org.opensearch.plugin.insights.core.reader; +import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash; + import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; @@ -99,8 +101,8 @@ public List read(final String from, final String to) { } ZonedDateTime curr = start; while (curr.isBefore(end.plusDays(1).toLocalDate().atStartOfDay(end.getZone()))) { - String index = getDateTimeFromFormat(curr); - SearchRequest searchRequest = new SearchRequest(index); + String indexName = buildLocalIndexName(curr); + SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*"); RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp") @@ -135,7 +137,7 @@ public void close() { logger.debug("Closing the LocalIndexReader.."); } - private String getDateTimeFromFormat(ZonedDateTime current) { - return current.format(indexPattern); + private String buildLocalIndexName(ZonedDateTime current) { + return current.format(indexPattern) + "-" + generateLocalIndexDateHash(); } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 150f5e3a..7669c78b 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -10,6 +10,7 @@ import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings; import java.io.IOException; @@ -19,13 +20,14 @@ import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -52,13 +54,15 @@ public class QueryInsightsService extends AbstractLifecycleComponent { private static final Logger logger = LogManager.getLogger(QueryInsightsService.class); + private final ClusterService clusterService; + /** * The internal OpenSearch thread pool that execute async processing and exporting tasks */ private final ThreadPool threadPool; /** - * Services to capture top n queries for different metric types + * Map of {@link MetricType} to associated {@link TopQueriesService} */ private final Map topQueriesServices; @@ -73,10 +77,10 @@ public class QueryInsightsService extends AbstractLifecycleComponent { private final LinkedBlockingQueue queryRecordsQueue; /** - * Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when + * List of references to delayed operations {@link Scheduler.Cancellable} so they can be cancelled when * the service closed concurrently. */ - protected volatile Scheduler.Cancellable scheduledFuture; + protected volatile List scheduledFutures; /** * Query Insights exporter factory @@ -102,7 +106,7 @@ public class QueryInsightsService extends AbstractLifecycleComponent { /** * Constructor of the QueryInsightsService * - * @param clusterSettings OpenSearch cluster level settings + * @param clusterService OpenSearch cluster service * @param threadPool The OpenSearch thread pool to run async tasks * @param client OS client * @param metricsRegistry Opentelemetry Metrics registry @@ -110,12 +114,13 @@ public class QueryInsightsService extends AbstractLifecycleComponent { */ @Inject public QueryInsightsService( - final ClusterSettings clusterSettings, + final ClusterService clusterService, final ThreadPool threadPool, final Client client, final MetricsRegistry metricsRegistry, final NamedXContentRegistry namedXContentRegistry ) { + this.clusterService = clusterService; enableCollect = new HashMap<>(); queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); this.threadPool = threadPool; @@ -132,11 +137,18 @@ public QueryInsightsService( ); } for (MetricType type : MetricType.allMetricTypes()) { - clusterSettings.addSettingsUpdateConsumer( - getExporterSettings(type), - (settings -> setExporterAndReader(type, settings)), - (settings -> validateExporterAndReaderConfig(type, settings)) - ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + getExporterSettings(type), + (settings -> setExporterAndReader(type, settings)), + (settings -> validateExporterAndReaderConfig(type, settings)) + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_EXPORTER_DELETE_AFTER, + (settings -> setExporterDeleteAfter(type, settings)), + (TopQueriesService::validateExporterDeleteAfter) + ); } this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry); @@ -389,7 +401,7 @@ public void setTopNSize(final MetricType type, final int topNSize) { * @param type {@link MetricType} * @param settings exporter and reader settings */ - public void setExporterAndReader(final MetricType type, final Settings settings) { + private void setExporterAndReader(final MetricType type, final Settings settings) { if (topQueriesServices.containsKey(type)) { TopQueriesService tqs = topQueriesServices.get(type); tqs.setExporter(settings); @@ -397,6 +409,18 @@ public void setExporterAndReader(final MetricType type, final Settings settings) } } + /** + * Set the exporter delete after + * + * @param type {@link MetricType} + * @param deleteAfter the number of days after which Top N local indices should be deleted + */ + private void setExporterDeleteAfter(final MetricType type, final int deleteAfter) { + if (topQueriesServices.containsKey(type)) { + topQueriesServices.get(type).setExporterDeleteAfter(deleteAfter); + } + } + /** * Get search query categorizer object * @return SearchQueryCategorizer object @@ -421,18 +445,32 @@ public void validateExporterAndReaderConfig(final MetricType type, final Setting @Override protected void doStart() { if (isAnyFeatureEnabled()) { - scheduledFuture = threadPool.scheduleWithFixedDelay( - this::drainRecords, - QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL, - QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR + scheduledFutures = new ArrayList<>(); + scheduledFutures.add( + threadPool.scheduleWithFixedDelay( + this::drainRecords, + QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL, + QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR + ) + ); + scheduledFutures.add( + threadPool.scheduleWithFixedDelay( + this::deleteExpiredIndices, + new TimeValue(1, TimeUnit.DAYS), // Check for deletable indices once per day + QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR + ) ); } } @Override protected void doStop() { - if (scheduledFuture != null) { - scheduledFuture.cancel(); + if (scheduledFutures != null) { + for (Scheduler.Cancellable cancellable : scheduledFutures) { + if (cancellable != null) { + cancellable.cancel(); + } + } } } @@ -462,4 +500,13 @@ public QueryInsightsHealthStats getHealthStats() { topQueriesHealthStatsMap ); } + + /** + * Delete Top N local indices older than the configured data retention period + */ + private void deleteExpiredIndices() { + for (MetricType metricType : MetricType.allMetricTypes()) { + topQueriesServices.get(metricType).deleteExpiredIndices(clusterService.state().metadata().indices()); + } + } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index 3b533f4c..649db823 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -11,6 +11,8 @@ import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; import java.io.IOException; @@ -33,9 +35,11 @@ import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.core.exporter.SinkType; @@ -536,4 +540,45 @@ private void drain() { public TopQueriesHealthStats getHealthStats() { return new TopQueriesHealthStats(this.topQueriesStore.size(), this.queryGrouper.getHealthStats()); } + + /** + * Validate the exporter delete after value + * + * @param deleteAfter exporter and reader settings + */ + static void validateExporterDeleteAfter(final int deleteAfter) { + if (deleteAfter < MIN_DELETE_AFTER_VALUE || deleteAfter > MAX_DELETE_AFTER_VALUE) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES); + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Invalid exporter delete_after_days setting [%d], value should be an integer between %d and %d.", + deleteAfter, + MIN_DELETE_AFTER_VALUE, + MAX_DELETE_AFTER_VALUE + ) + ); + } + } + + /** + * Set exporter delete after if exporter is a {@link LocalIndexExporter} + * + * @param deleteAfter the number of days after which Top N local indices should be deleted + */ + void setExporterDeleteAfter(final int deleteAfter) { + if (exporter != null && exporter.getClass() == LocalIndexExporter.class) { + ((LocalIndexExporter) exporter).setDeleteAfter(deleteAfter); + } + } + + /** + * Delete Top N local indices older than the configured data retention period + */ + void deleteExpiredIndices(final Map indexMetadataMap) { + if (exporter != null && exporter.getClass() == LocalIndexExporter.class) { + threadPool.executor(QUERY_INSIGHTS_EXECUTOR) + .execute(() -> ((LocalIndexExporter) exporter).deleteExpiredIndices(indexMetadataMap)); + } + } } diff --git a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index b7ffefb7..382c4626 100644 --- a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -239,6 +239,31 @@ public class QueryInsightsSettings { * Default exporter type of top queries */ public static final String DEFAULT_TOP_QUERIES_EXPORTER_TYPE = SinkType.LOCAL_INDEX.toString(); + /** + * Default Top N local indices retention period in days + */ + public static final int DEFAULT_DELETE_AFTER_VALUE = 7; + /** + * Minimum Top N local indices retention period in days + */ + public static final int MIN_DELETE_AFTER_VALUE = 1; + /** + * Maximum Top N local indices retention period in days + */ + public static final int MAX_DELETE_AFTER_VALUE = 180; + + /** + * Setting for Top N local indices retention period + *

+ * Note: This setting is only applicable when sink type is "local_index" + * and it applies to exporters of all metric types + */ + public static final Setting TOP_N_EXPORTER_DELETE_AFTER = Setting.intSetting( + TOP_N_QUERIES_SETTING_PREFIX + ".delete_after_days", + DEFAULT_DELETE_AFTER_VALUE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); /** * Settings for the exporter of top latency queries diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java index de4354b0..8d8dad0b 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -82,6 +82,7 @@ public void testGetSettings() { QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE, QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING, + QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER, QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY ), queryInsightsPlugin.getSettings() diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index a7151933..543518fd 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -339,6 +339,7 @@ public static void registerAllQueryInsightsSettings(ClusterSettings clusterSetti clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER); clusterSettings.registerSetting(QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING); } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java index a7c934a9..9c481c09 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java @@ -8,21 +8,37 @@ package org.opensearch.plugin.insights.core.exporter; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash; +import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.isTopQueriesIndex; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; +import java.time.Instant; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; import org.junit.Before; +import org.opensearch.Version; import org.opensearch.action.bulk.BulkAction; import org.opensearch.action.bulk.BulkRequestBuilder; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.AdminClient; import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.test.OpenSearchTestCase; @@ -33,11 +49,16 @@ public class LocalIndexExporterTests extends OpenSearchTestCase { private final DateTimeFormatter format = DateTimeFormatter.ofPattern("YYYY.MM.dd", Locale.ROOT); private final Client client = mock(Client.class); + private final AdminClient adminClient = mock(AdminClient.class); + private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); private LocalIndexExporter localIndexExporter; @Before public void setup() { localIndexExporter = new LocalIndexExporter(client, format); + + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesAdminClient); } public void testExportEmptyRecords() { @@ -95,4 +116,48 @@ public void testGetAndSetIndexPattern() { localIndexExporter.setIndexPattern(newFormatter); assert (localIndexExporter.getIndexPattern() == newFormatter); } + + public void testDeleteExpiredIndices() { + // Reset exporter index pattern to default + localIndexExporter.setIndexPattern(DateTimeFormatter.ofPattern(DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, Locale.ROOT)); + + // Create 10 top_queries-YYYY.MM.dd indices + Map indexMetadataMap = new HashMap<>(); + for (int i = 0; i < 10; i++) { + String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); + long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .build(); + indexMetadataMap.put(indexName, indexMetadata); + } + localIndexExporter.deleteExpiredIndices(indexMetadataMap); + // Default retention is 7 days + // Oldest 3 of 10 indices should be deleted + verify(client, times(3)).admin(); + verify(adminClient, times(3)).indices(); + verify(indicesAdminClient, times(3)).delete(any()); + } + + public void testIsTopQueriesIndex() { + localIndexExporter.setIndexPattern(DateTimeFormatter.ofPattern(DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, Locale.ROOT)); + assertTrue(isTopQueriesIndex(localIndexExporter.buildLocalIndexName())); + assertTrue(isTopQueriesIndex("top_queries-2024.01.01-01234")); + assertTrue(isTopQueriesIndex("top_queries-2025.12.12-99999")); + + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-012345")); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-0123w")); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01")); + assertFalse(isTopQueriesIndex("top_queries-2024.01.32-01234")); + assertFalse(isTopQueriesIndex("top_queries-01234")); + assertFalse(isTopQueriesIndex("top_querie-2024.01.01-01234")); + assertFalse(isTopQueriesIndex("2024.01.01-01234")); + } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index 279fee2a..33d89b29 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit; import org.junit.Before; import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -59,7 +60,7 @@ public void setup() { new ScalingExecutorBuilder(QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, 1, 5, TimeValue.timeValueMinutes(5)) ); queryInsightsService = new QueryInsightsService( - clusterSettings, + new ClusterService(settings, clusterSettings, threadPool), threadPool, client, NoopMetricsRegistry.INSTANCE, diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index 8d701b5b..35554436 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -11,6 +11,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.validateExporterDeleteAfter; import java.util.List; import java.util.concurrent.TimeUnit; @@ -180,4 +181,15 @@ public void testGetHealthStats_WithData() { // Assuming no grouping by default, expect QueryGroupCount to be 0 assertEquals(0, healthStats.getQueryGrouperHealthStats().getQueryGroupCount()); } + + public void testValidateExporterDeleteAfter() { + assertThrows(IllegalArgumentException.class, () -> { validateExporterDeleteAfter(-1); }); + assertThrows(IllegalArgumentException.class, () -> { validateExporterDeleteAfter(0); }); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { validateExporterDeleteAfter(181); }); + assertEquals( + "Invalid exporter delete_after_days setting [181], value should be an integer between 1 and 180.", + exception.getMessage() + ); + + } }