diff --git a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 26d86e9f..13d62151 100644 --- a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -82,7 +82,7 @@ public Collection createComponents( OperationalMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry); // create top n queries service final QueryInsightsService queryInsightsService = new QueryInsightsService( - clusterService.getClusterSettings(), + clusterService, threadPool, client, metricsRegistry, @@ -145,6 +145,7 @@ public List> getSettings() { QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE, QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING, + QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER, QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY ); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java index ef396bcc..a5085fe1 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java @@ -8,22 +8,31 @@ package org.opensearch.plugin.insights.core.exporter; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE; + +import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.bulk.BulkRequestBuilder; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.plugin.insights.core.metrics.OperationalMetric; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; +import org.opensearch.plugin.insights.core.service.TopQueriesService; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; /** @@ -36,6 +45,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter { private final Logger logger = LogManager.getLogger(); private final Client client; private DateTimeFormatter indexPattern; + private int deleteAfter; /** * Constructor of LocalIndexExporter @@ -46,6 +56,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter { public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) { this.indexPattern = indexPattern; this.client = client; + this.deleteAfter = DEFAULT_DELETE_AFTER_VALUE; } /** @@ -61,11 +72,9 @@ public DateTimeFormatter getIndexPattern() { * Setter of indexPattern * * @param indexPattern index pattern - * @return the current LocalIndexExporter */ - public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) { + void setIndexPattern(DateTimeFormatter indexPattern) { this.indexPattern = indexPattern; - return this; } /** @@ -75,15 +84,15 @@ public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) { */ @Override public void export(final List records) { - if (records == null || records.size() == 0) { + if (records == null || records.isEmpty()) { return; } try { - final String index = getDateTimeFromFormat(); + final String indexName = buildLocalIndexName(); final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1)); for (SearchQueryRecord record : records) { bulkRequestBuilder.add( - new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + new IndexRequest(indexName).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) ); } bulkRequestBuilder.execute(new ActionListener() { @@ -110,7 +119,52 @@ public void close() { logger.debug("Closing the LocalIndexExporter.."); } - private String getDateTimeFromFormat() { - return indexPattern.format(ZonedDateTime.now(ZoneOffset.UTC)); + /** + * Builds the local index name using the current UTC datetime + * + * @return A string representing the index name in the format "top_queries-YYYY.MM.dd-01234". + */ + String buildLocalIndexName() { + return indexPattern.format(ZonedDateTime.now(ZoneOffset.UTC)) + "-" + generateLocalIndexDateHash(); + } + + /** + * Set local index exporter data retention period + * + * @param deleteAfter the number of days after which Top N local indices should be deleted + */ + public void setDeleteAfter(final int deleteAfter) { + this.deleteAfter = deleteAfter; + } + + /** + * Delete Top N local indices older than the configured data retention period + * + * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata} + */ + public void deleteExpiredTopNIndices(final Map indexMetadataMap) { + long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter); + for (Map.Entry entry : indexMetadataMap.entrySet()) { + String indexName = entry.getKey(); + if (isTopQueriesIndex(indexName) && entry.getValue().getCreationDate() <= expirationMillisLong) { + // delete this index + TopQueriesService.deleteSingleIndex(indexName, client); + } + } + } + + /** + * Generates a consistent 5-digit numeric hash based on the current UTC date. + * The generated hash is deterministic, meaning it will return the same result for the same date. + * + * @return A 5-digit numeric string representation of the current date's hash. + */ + public static String generateLocalIndexDateHash() { + // Get the current date in UTC (yyyy-MM-dd format) + String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT) + .format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate()); + + // Generate a 5-digit numeric hash from the date's hashCode + return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000); } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java b/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java index 626c09e9..75ede42a 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java +++ b/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java @@ -13,9 +13,9 @@ public enum OperationalMetric { LOCAL_INDEX_READER_PARSING_EXCEPTIONS("Number of errors when parsing with LocalIndexReader"), LOCAL_INDEX_EXPORTER_BULK_FAILURES("Number of failures when ingesting Query Insights data to local indices"), + LOCAL_INDEX_EXPORTER_DELETE_FAILURES("Number of failures when deleting local indices"), LOCAL_INDEX_EXPORTER_EXCEPTIONS("Number of exceptions in Query Insights LocalIndexExporter"), INVALID_EXPORTER_TYPE_FAILURES("Number of invalid exporter type failures"), - INVALID_INDEX_PATTERN_EXCEPTIONS("Number of invalid index pattern exceptions"), DATA_INGEST_EXCEPTIONS("Number of exceptions during data ingest in Query Insights"), QUERY_CATEGORIZE_EXCEPTIONS("Number of exceptions when categorizing the queries"), EXPORTER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the exporter"), diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java index df8ba7a6..f7e4b8e2 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java @@ -8,6 +8,8 @@ package org.opensearch.plugin.insights.core.reader; +import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash; + import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; @@ -99,8 +101,8 @@ public List read(final String from, final String to) { } ZonedDateTime curr = start; while (curr.isBefore(end.plusDays(1).toLocalDate().atStartOfDay(end.getZone()))) { - String index = getDateTimeFromFormat(curr); - SearchRequest searchRequest = new SearchRequest(index); + String indexName = buildLocalIndexName(curr); + SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*"); RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp") @@ -135,7 +137,7 @@ public void close() { logger.debug("Closing the LocalIndexReader.."); } - private String getDateTimeFromFormat(ZonedDateTime current) { - return current.format(indexPattern); + private String buildLocalIndexName(ZonedDateTime current) { + return current.format(indexPattern) + "-" + generateLocalIndexDateHash(); } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 150f5e3a..2f032a72 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -10,6 +10,7 @@ import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings; import java.io.IOException; @@ -19,13 +20,15 @@ import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -52,13 +55,15 @@ public class QueryInsightsService extends AbstractLifecycleComponent { private static final Logger logger = LogManager.getLogger(QueryInsightsService.class); + private final ClusterService clusterService; + /** * The internal OpenSearch thread pool that execute async processing and exporting tasks */ private final ThreadPool threadPool; /** - * Services to capture top n queries for different metric types + * Map of {@link MetricType} to associated {@link TopQueriesService} */ private final Map topQueriesServices; @@ -73,10 +78,10 @@ public class QueryInsightsService extends AbstractLifecycleComponent { private final LinkedBlockingQueue queryRecordsQueue; /** - * Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when + * List of references to delayed operations {@link Scheduler.Cancellable} so they can be cancelled when * the service closed concurrently. */ - protected volatile Scheduler.Cancellable scheduledFuture; + protected volatile List scheduledFutures; /** * Query Insights exporter factory @@ -102,7 +107,7 @@ public class QueryInsightsService extends AbstractLifecycleComponent { /** * Constructor of the QueryInsightsService * - * @param clusterSettings OpenSearch cluster level settings + * @param clusterService OpenSearch cluster service * @param threadPool The OpenSearch thread pool to run async tasks * @param client OS client * @param metricsRegistry Opentelemetry Metrics registry @@ -110,12 +115,13 @@ public class QueryInsightsService extends AbstractLifecycleComponent { */ @Inject public QueryInsightsService( - final ClusterSettings clusterSettings, + final ClusterService clusterService, final ThreadPool threadPool, final Client client, final MetricsRegistry metricsRegistry, final NamedXContentRegistry namedXContentRegistry ) { + this.clusterService = clusterService; enableCollect = new HashMap<>(); queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); this.threadPool = threadPool; @@ -128,15 +134,22 @@ public QueryInsightsService( enableCollect.put(metricType, false); topQueriesServices.put( metricType, - new TopQueriesService(metricType, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory) + new TopQueriesService(client, metricType, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory) ); } for (MetricType type : MetricType.allMetricTypes()) { - clusterSettings.addSettingsUpdateConsumer( - getExporterSettings(type), - (settings -> setExporterAndReader(type, settings)), - (settings -> validateExporterAndReaderConfig(type, settings)) - ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + getExporterSettings(type), + (settings -> setExporterAndReader(type, settings, clusterService.state().metadata().indices())), + (settings -> validateExporterAndReaderConfig(type, settings)) + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_EXPORTER_DELETE_AFTER, + (settings -> setExporterDeleteAfter(type, settings)), + (TopQueriesService::validateExporterDeleteAfter) + ); } this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry); @@ -389,14 +402,26 @@ public void setTopNSize(final MetricType type, final int topNSize) { * @param type {@link MetricType} * @param settings exporter and reader settings */ - public void setExporterAndReader(final MetricType type, final Settings settings) { + private void setExporterAndReader(final MetricType type, final Settings settings, final Map indexMetadataMap) { if (topQueriesServices.containsKey(type)) { TopQueriesService tqs = topQueriesServices.get(type); - tqs.setExporter(settings); + tqs.setExporter(settings, indexMetadataMap); tqs.setReader(settings, namedXContentRegistry); } } + /** + * Set the exporter delete after + * + * @param type {@link MetricType} + * @param deleteAfter the number of days after which Top N local indices should be deleted + */ + private void setExporterDeleteAfter(final MetricType type, final int deleteAfter) { + if (topQueriesServices.containsKey(type)) { + topQueriesServices.get(type).setExporterDeleteAfter(deleteAfter); + } + } + /** * Get search query categorizer object * @return SearchQueryCategorizer object @@ -421,18 +446,32 @@ public void validateExporterAndReaderConfig(final MetricType type, final Setting @Override protected void doStart() { if (isAnyFeatureEnabled()) { - scheduledFuture = threadPool.scheduleWithFixedDelay( - this::drainRecords, - QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL, - QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR + scheduledFutures = new ArrayList<>(); + scheduledFutures.add( + threadPool.scheduleWithFixedDelay( + this::drainRecords, + QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL, + QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR + ) + ); + scheduledFutures.add( + threadPool.scheduleWithFixedDelay( + this::deleteExpiredTopNIndices, + new TimeValue(1, TimeUnit.DAYS), // Check for deletable indices once per day + QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR + ) ); } } @Override protected void doStop() { - if (scheduledFuture != null) { - scheduledFuture.cancel(); + if (scheduledFutures != null) { + for (Scheduler.Cancellable cancellable : scheduledFutures) { + if (cancellable != null) { + cancellable.cancel(); + } + } } } @@ -462,4 +501,13 @@ public QueryInsightsHealthStats getHealthStats() { topQueriesHealthStatsMap ); } + + /** + * Delete Top N local indices older than the configured data retention period + */ + private void deleteExpiredTopNIndices() { + for (MetricType metricType : MetricType.allMetricTypes()) { + topQueriesServices.get(metricType).deleteExpiredTopNIndices(clusterService.state().metadata().indices()); + } + } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index 3b533f4c..b64bb123 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -11,14 +11,19 @@ import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; import java.io.IOException; import java.time.Instant; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; @@ -33,9 +38,14 @@ import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.core.exporter.SinkType; @@ -68,6 +78,7 @@ public class TopQueriesService { */ private final Logger logger = LogManager.getLogger(); private boolean enabled; + private final Client client; /** * The metric type to measure top n queries */ @@ -117,15 +128,17 @@ public class TopQueriesService { private QueryInsightsExporter exporter; private QueryInsightsReader reader; - private QueryGrouper queryGrouper; + private final QueryGrouper queryGrouper; TopQueriesService( + final Client client, final MetricType metricType, final ThreadPool threadPool, final QueryInsightsExporterFactory queryInsightsExporterFactory, QueryInsightsReaderFactory queryInsightsReaderFactory ) { this.enabled = false; + this.client = client; this.metricType = metricType; this.threadPool = threadPool; this.queryInsightsExporterFactory = queryInsightsExporterFactory; @@ -261,7 +274,11 @@ public void validateWindowSize(final TimeValue windowSize) { * * @param settings exporter config {@link Settings} */ - public void setExporter(final Settings settings) { + public void setExporter(final Settings settings, final Map indexMetadataMap) { + // This method is called only when sink type is changed + // Clear all top_queries-* indices + deleteAllTopNIndices(indexMetadataMap); + if (settings.get(EXPORTER_TYPE) != null) { SinkType expectedType = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)); if (exporter != null && expectedType == SinkType.getSinkTypeFromExporter(exporter)) { @@ -536,4 +553,111 @@ private void drain() { public TopQueriesHealthStats getHealthStats() { return new TopQueriesHealthStats(this.topQueriesStore.size(), this.queryGrouper.getHealthStats()); } + + /** + * Validate the exporter delete after value + * + * @param deleteAfter exporter and reader settings + */ + static void validateExporterDeleteAfter(final int deleteAfter) { + if (deleteAfter < MIN_DELETE_AFTER_VALUE || deleteAfter > MAX_DELETE_AFTER_VALUE) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES); + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Invalid exporter delete_after_days setting [%d], value should be an integer between %d and %d.", + deleteAfter, + MIN_DELETE_AFTER_VALUE, + MAX_DELETE_AFTER_VALUE + ) + ); + } + } + + /** + * Set exporter delete after if exporter is a {@link LocalIndexExporter} + * + * @param deleteAfter the number of days after which Top N local indices should be deleted + */ + void setExporterDeleteAfter(final int deleteAfter) { + if (exporter != null && exporter.getClass() == LocalIndexExporter.class) { + ((LocalIndexExporter) exporter).setDeleteAfter(deleteAfter); + } + } + + /** + * Delete Top N local indices older than the configured data retention period + */ + void deleteExpiredTopNIndices(final Map indexMetadataMap) { + if (exporter != null && exporter.getClass() == LocalIndexExporter.class) { + threadPool.executor(QUERY_INSIGHTS_EXECUTOR) + .execute(() -> ((LocalIndexExporter) exporter).deleteExpiredTopNIndices(indexMetadataMap)); + } + } + + /** + * Deletes all Top N local indices + * + * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata} + */ + void deleteAllTopNIndices(final Map indexMetadataMap) { + indexMetadataMap.entrySet() + .stream() + .filter(entry -> isTopQueriesIndex(entry.getKey())) + .forEach(entry -> deleteSingleIndex(entry.getKey(), client)); + } + + /** + * Deletes the specified index and logs any failure that occurs during the operation. + * + * @param indexName The name of the index to delete. + * @param client The OpenSearch client used to perform the deletion. + */ + public static void deleteSingleIndex(String indexName, Client client) { + Logger logger = LogManager.getLogger(); + client.admin().indices().delete(new DeleteIndexRequest(indexName), new ActionListener<>() { + @Override + // CS-SUPPRESS-SINGLE: RegexpSingleline It is not possible to use phrase "cluster manager" instead of master here + public void onResponse(org.opensearch.action.support.master.AcknowledgedResponse acknowledgedResponse) {} + + @Override + public void onFailure(Exception e) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_DELETE_FAILURES); + logger.error("Failed to delete index '{}': ", indexName, e); + } + }); + } + + /** + * Validates if the input string is a Query Insights local index name + * in the format "top_queries-YYYY.MM.dd-XXXXX". + * + * @param indexName the string to validate. + * @return {@code true} if the string is valid, {@code false} otherwise. + */ + public static boolean isTopQueriesIndex(String indexName) { + // Split the input string by '-' + String[] parts = indexName.split("-"); + + // Check if the string has exactly 3 parts + if (parts.length != 3) { + return false; + } + + // Validate the first part is "top_queries" + if (!"top_queries".equals(parts[0])) { + return false; + } + + // Validate the second part is a valid date in "YYYY.MM.dd" format + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd", Locale.ROOT); + try { + LocalDate.parse(parts[1], formatter); + } catch (DateTimeParseException e) { + return false; + } + + // Validate the third part is exactly 5 digits + return parts[2].matches("\\d{5}"); + } } diff --git a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index b7ffefb7..382c4626 100644 --- a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -239,6 +239,31 @@ public class QueryInsightsSettings { * Default exporter type of top queries */ public static final String DEFAULT_TOP_QUERIES_EXPORTER_TYPE = SinkType.LOCAL_INDEX.toString(); + /** + * Default Top N local indices retention period in days + */ + public static final int DEFAULT_DELETE_AFTER_VALUE = 7; + /** + * Minimum Top N local indices retention period in days + */ + public static final int MIN_DELETE_AFTER_VALUE = 1; + /** + * Maximum Top N local indices retention period in days + */ + public static final int MAX_DELETE_AFTER_VALUE = 180; + + /** + * Setting for Top N local indices retention period + *

+ * Note: This setting is only applicable when sink type is "local_index" + * and it applies to exporters of all metric types + */ + public static final Setting TOP_N_EXPORTER_DELETE_AFTER = Setting.intSetting( + TOP_N_QUERIES_SETTING_PREFIX + ".delete_after_days", + DEFAULT_DELETE_AFTER_VALUE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); /** * Settings for the exporter of top latency queries diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java index de4354b0..8d8dad0b 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -82,6 +82,7 @@ public void testGetSettings() { QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE, QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING, + QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER, QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY ), queryInsightsPlugin.getSettings() diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index a7151933..543518fd 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -339,6 +339,7 @@ public static void registerAllQueryInsightsSettings(ClusterSettings clusterSetti clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER); clusterSettings.registerSetting(QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING); } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java index a7c934a9..7e5db95c 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java @@ -8,21 +8,36 @@ package org.opensearch.plugin.insights.core.exporter; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; +import java.time.Instant; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; import org.junit.Before; +import org.opensearch.Version; import org.opensearch.action.bulk.BulkAction; import org.opensearch.action.bulk.BulkRequestBuilder; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.AdminClient; import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.test.OpenSearchTestCase; @@ -33,11 +48,16 @@ public class LocalIndexExporterTests extends OpenSearchTestCase { private final DateTimeFormatter format = DateTimeFormatter.ofPattern("YYYY.MM.dd", Locale.ROOT); private final Client client = mock(Client.class); + private final AdminClient adminClient = mock(AdminClient.class); + private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); private LocalIndexExporter localIndexExporter; @Before public void setup() { localIndexExporter = new LocalIndexExporter(client, format); + + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesAdminClient); } public void testExportEmptyRecords() { @@ -95,4 +115,33 @@ public void testGetAndSetIndexPattern() { localIndexExporter.setIndexPattern(newFormatter); assert (localIndexExporter.getIndexPattern() == newFormatter); } + + public void testDeleteExpiredTopNIndices() { + // Reset exporter index pattern to default + localIndexExporter.setIndexPattern(DateTimeFormatter.ofPattern(DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, Locale.ROOT)); + + // Create 9 top_queries-* indices + Map indexMetadataMap = new HashMap<>(); + for (int i = 1; i < 10; i++) { + String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); + long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .build(); + indexMetadataMap.put(indexName, indexMetadata); + } + localIndexExporter.deleteExpiredTopNIndices(indexMetadataMap); + // Default retention is 7 days + // Oldest 3 of 10 indices should be deleted + verify(client, times(3)).admin(); + verify(adminClient, times(3)).indices(); + verify(indicesAdminClient, times(3)).delete(any(), any()); + } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index 279fee2a..33d89b29 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit; import org.junit.Before; import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -59,7 +60,7 @@ public void setup() { new ScalingExecutorBuilder(QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, 1, 5, TimeValue.timeValueMinutes(5)) ); queryInsightsService = new QueryInsightsService( - clusterSettings, + new ClusterService(settings, clusterSettings, threadPool), threadPool, client, NoopMetricsRegistry.INSTANCE, diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index 8d701b5b..8e2bdacc 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -10,12 +10,28 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.validateExporterDeleteAfter; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; import org.opensearch.cluster.coordination.DeterministicTaskQueue; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; @@ -36,23 +52,34 @@ */ public class TopQueriesServiceTests extends OpenSearchTestCase { private TopQueriesService topQueriesService; + private final Client client = mock(Client.class); private final ThreadPool threadPool = mock(ThreadPool.class); private final QueryInsightsExporterFactory queryInsightsExporterFactory = mock(QueryInsightsExporterFactory.class); private final QueryInsightsReaderFactory queryInsightsReaderFactory = mock(QueryInsightsReaderFactory.class); - private MetricsRegistry metricsRegistry; + private final AdminClient adminClient = mock(AdminClient.class); + private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); @Before public void setup() { - topQueriesService = new TopQueriesService(MetricType.LATENCY, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory); + topQueriesService = new TopQueriesService( + client, + MetricType.LATENCY, + threadPool, + queryInsightsExporterFactory, + queryInsightsReaderFactory + ); topQueriesService.setTopNSize(Integer.MAX_VALUE); topQueriesService.setWindowSize(new TimeValue(Long.MAX_VALUE)); topQueriesService.setEnabled(true); - metricsRegistry = mock(MetricsRegistry.class); + MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); when(metricsRegistry.createCounter(any(String.class), any(String.class), any(String.class))).thenAnswer( invocation -> mock(Counter.class) ); OperationalMetricsCounter.initialize("cluster", metricsRegistry); + + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesAdminClient); } public void testIngestQueryDataWithLargeWindow() { @@ -180,4 +207,72 @@ public void testGetHealthStats_WithData() { // Assuming no grouping by default, expect QueryGroupCount to be 0 assertEquals(0, healthStats.getQueryGrouperHealthStats().getQueryGroupCount()); } + + public void testValidateExporterDeleteAfter() { + validateExporterDeleteAfter(7); + validateExporterDeleteAfter(180); + validateExporterDeleteAfter(1); + assertThrows(IllegalArgumentException.class, () -> { validateExporterDeleteAfter(-1); }); + assertThrows(IllegalArgumentException.class, () -> { validateExporterDeleteAfter(0); }); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { validateExporterDeleteAfter(181); }); + assertEquals( + "Invalid exporter delete_after_days setting [181], value should be an integer between 1 and 180.", + exception.getMessage() + ); + } + + public void testDeleteAllTopNIndices() { + // Create 9 top_queries-* indices + Map indexMetadataMap = new HashMap<>(); + for (int i = 1; i < 10; i++) { + String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); + long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .build(); + indexMetadataMap.put(indexName, indexMetadata); + } + // Create 5 user indices + for (int i = 0; i < 5; i++) { + String indexName = "my_index-" + i; + long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .build(); + indexMetadataMap.put(indexName, indexMetadata); + } + + topQueriesService.deleteAllTopNIndices(indexMetadataMap); + // All 10 indices should be deleted + verify(client, times(9)).admin(); + verify(adminClient, times(9)).indices(); + verify(indicesAdminClient, times(9)).delete(any(), any()); + } + + public void testIsTopQueriesIndex() { + assertTrue(isTopQueriesIndex("top_queries-2024.01.01-01234")); + assertTrue(isTopQueriesIndex("top_queries-2025.12.12-99999")); + + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-012345")); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-0123w")); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01")); + assertFalse(isTopQueriesIndex("top_queries-2024.01.32-01234")); + assertFalse(isTopQueriesIndex("top_queries-01234")); + assertFalse(isTopQueriesIndex("top_querie-2024.01.01-01234")); + assertFalse(isTopQueriesIndex("2024.01.01-01234")); + } }