From 3a6b46a61fbeae4f36136a57c36299946e0241c0 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Thu, 23 Jan 2025 17:36:38 -0800 Subject: [PATCH 1/7] Move exporter config to query insights level Signed-off-by: Chenyang Ji --- README.md | 3 +- .../plugin/insights/QueryInsightsPlugin.java | 4 +- .../insights/core/exporter/DebugExporter.java | 6 + .../core/exporter/LocalIndexExporter.java | 9 +- .../core/exporter/QueryInsightsExporter.java | 2 + .../QueryInsightsExporterFactory.java | 47 +++--- .../insights/core/exporter/SinkType.java | 6 +- .../core/reader/LocalIndexReader.java | 14 +- .../core/reader/QueryInsightsReader.java | 2 + .../reader/QueryInsightsReaderFactory.java | 62 ++++---- .../core/service/QueryInsightsService.java | 140 +++++++++++++----- .../core/service/TopQueriesService.java | 120 +-------------- .../settings/QueryInsightsSettings.java | 60 ++------ .../insights/QueryInsightsPluginTests.java | 4 +- .../insights/QueryInsightsTestUtils.java | 4 +- .../exporter/LocalIndexExporterTests.java | 2 +- .../QueryInsightsExporterFactoryTests.java | 20 +-- .../exporter/QueryInsightsExporterIT.java | 13 +- .../core/reader/LocalIndexReaderTests.java | 2 +- .../QueryInsightsReaderFactoryTests.java | 9 +- .../service/QueryInsightsServiceTests.java | 58 ++++++++ .../core/service/TopQueriesServiceTests.java | 53 ------- 22 files changed, 292 insertions(+), 348 deletions(-) diff --git a/README.md b/README.md index 2f2e9ba0..86e29e81 100644 --- a/README.md +++ b/README.md @@ -52,8 +52,7 @@ A local index exporter allows you to export the top N queries to local OpenSearc PUT _cluster/settings { "persistent" : { - "search.insights.top_queries.latency.exporter.type" : "local_index", - "search.insights.top_queries.latency.exporter.config.index" : "YYYY.MM.dd" + "search.insights.top_queries.exporter.type" : "local_index" } } ``` diff --git a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 13d62151..d912ccf2 100644 --- a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -131,21 +131,19 @@ public List> getSettings() { QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE, QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY, QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE, QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING, QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER, + QueryInsightsSettings.TOP_N_EXPORTER_TYPE, QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY ); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java index d63f19cb..2fbfa5b9 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java @@ -21,12 +21,18 @@ public final class DebugExporter implements QueryInsightsExporter { * Logger of the debug exporter */ private final Logger logger = LogManager.getLogger(); + private static final String EXPORTER_ID = "debug_exporter"; /** * Constructor of DebugExporter */ private DebugExporter() {} + @Override + public String getId() { + return EXPORTER_ID; + } + private static class InstanceHolder { private static final DebugExporter INSTANCE = new DebugExporter(); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java index a5085fe1..cdc462fd 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java @@ -46,6 +46,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter { private final Client client; private DateTimeFormatter indexPattern; private int deleteAfter; + private final String id; /** * Constructor of LocalIndexExporter @@ -53,10 +54,16 @@ public final class LocalIndexExporter implements QueryInsightsExporter { * @param client OS client * @param indexPattern the pattern of index to export to */ - public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) { + public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern, final String id) { this.indexPattern = indexPattern; this.client = client; this.deleteAfter = DEFAULT_DELETE_AFTER_VALUE; + this.id = id; + } + + @Override + public String getId() { + return id; } /** diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java index d50f23c3..b9ff0ab6 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java @@ -22,4 +22,6 @@ public interface QueryInsightsExporter extends Closeable { * @param records list of {@link SearchQueryRecord} */ void export(final List records); + + String getId(); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java index 3a2c2fa7..4c4a6cae 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java @@ -8,18 +8,14 @@ package org.opensearch.plugin.insights.core.exporter; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; - import java.io.IOException; import java.time.format.DateTimeFormatter; -import java.util.HashSet; +import java.util.HashMap; import java.util.Locale; -import java.util.Set; +import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; -import org.opensearch.common.settings.Settings; import org.opensearch.plugin.insights.core.metrics.OperationalMetric; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; @@ -32,7 +28,7 @@ public class QueryInsightsExporterFactory { */ private final Logger logger = LogManager.getLogger(); final private Client client; - final private Set exporters; + final private Map exporters; /** * Constructor of QueryInsightsExporterFactory @@ -41,32 +37,26 @@ public class QueryInsightsExporterFactory { */ public QueryInsightsExporterFactory(final Client client) { this.client = client; - this.exporters = new HashSet<>(); + this.exporters = new HashMap<>(); } /** * Validate exporter sink config * - * @param settings exporter sink config {@link Settings} + * @param exporterType exporter sink type * @throws IllegalArgumentException if provided exporter sink config settings are invalid */ - public void validateExporterConfig(final Settings settings) throws IllegalArgumentException { + public void validateExporterType(final String exporterType) throws IllegalArgumentException { // Disable exporter if the EXPORTER_TYPE setting is null - if (settings.get(EXPORTER_TYPE) == null) { + if (exporterType == null) { return; } - SinkType type; try { - type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)); + SinkType.parse(exporterType); } catch (IllegalArgumentException e) { OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES); throw new IllegalArgumentException( - String.format( - Locale.ROOT, - "Invalid exporter type [%s], type should be one of %s", - settings.get(EXPORTER_TYPE), - SinkType.allSinkTypes() - ) + String.format(Locale.ROOT, "Invalid exporter type [%s], type should be one of %s", exporterType, SinkType.allSinkTypes()) ); } } @@ -78,10 +68,10 @@ public void validateExporterConfig(final Settings settings) throws IllegalArgume * @param indexPattern the index pattern if creating a index exporter * @return QueryInsightsExporter the created exporter sink */ - public QueryInsightsExporter createExporter(SinkType type, String indexPattern) { + public QueryInsightsExporter createExporter(String id, SinkType type, String indexPattern) { if (SinkType.LOCAL_INDEX.equals(type)) { - QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT)); - this.exporters.add(exporter); + QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), id); + this.exporters.put(id, exporter); return exporter; } return DebugExporter.getInstance(); @@ -101,6 +91,15 @@ public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, Stri return exporter; } + /** + * Get a exporter by id + * @param id The id of the exporter + * @return QueryInsightsReader the Reader + */ + public QueryInsightsExporter getExporter(String id) { + return this.exporters.get(id); + } + /** * Close an exporter * @@ -110,7 +109,7 @@ public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, Stri public void closeExporter(QueryInsightsExporter exporter) throws IOException { if (exporter != null) { exporter.close(); - this.exporters.remove(exporter); + this.exporters.remove(exporter.getId()); } } @@ -119,7 +118,7 @@ public void closeExporter(QueryInsightsExporter exporter) throws IOException { * */ public void closeAllExporters() { - for (QueryInsightsExporter exporter : exporters) { + for (QueryInsightsExporter exporter : exporters.values()) { try { closeExporter(exporter); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java index c90c9c76..ded7cc35 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java @@ -17,6 +17,8 @@ * Type of supported sinks */ public enum SinkType { + /** no exporter */ + NONE("none"), /** debug exporter */ DEBUG("debug"), /** local index exporter */ @@ -60,7 +62,9 @@ public static Set allSinkTypes() { public static SinkType getSinkTypeFromExporter(QueryInsightsExporter exporter) { if (exporter.getClass().equals(LocalIndexExporter.class)) { return SinkType.LOCAL_INDEX; + } else if (exporter.getClass().equals(DebugExporter.class)) { + return SinkType.DEBUG; } - return SinkType.DEBUG; + return SinkType.NONE; } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java index 6969b021..7abc9b7a 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java @@ -46,6 +46,7 @@ public final class LocalIndexReader implements QueryInsightsReader { private final Client client; private DateTimeFormatter indexPattern; private final NamedXContentRegistry namedXContentRegistry; + private final String id; /** * Constructor of LocalIndexReader @@ -54,12 +55,23 @@ public final class LocalIndexReader implements QueryInsightsReader { * @param indexPattern the pattern of index to read from * @param namedXContentRegistry for parsing purposes */ - public LocalIndexReader(final Client client, final DateTimeFormatter indexPattern, final NamedXContentRegistry namedXContentRegistry) { + public LocalIndexReader( + final Client client, + final DateTimeFormatter indexPattern, + final NamedXContentRegistry namedXContentRegistry, + final String id + ) { this.indexPattern = indexPattern; this.client = client; + this.id = id; this.namedXContentRegistry = namedXContentRegistry; } + @Override + public String getId() { + return id; + } + /** * Getter of indexPattern * diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java index 6e96a96e..2cc7434d 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java @@ -25,4 +25,6 @@ public interface QueryInsightsReader extends Closeable { * @return List of SearchQueryRecord */ List read(final String from, final String to, final String id); + + String getId(); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java index 0f3c2701..aa7fff4d 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java @@ -10,9 +10,9 @@ import java.io.IOException; import java.time.format.DateTimeFormatter; -import java.util.HashSet; +import java.util.HashMap; import java.util.Locale; -import java.util.Set; +import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; @@ -27,7 +27,7 @@ public class QueryInsightsReaderFactory { */ private final Logger logger = LogManager.getLogger(); final private Client client; - final private Set Readers; + final private Map readers; /** * Constructor of QueryInsightsReaderFactory @@ -36,7 +36,7 @@ public class QueryInsightsReaderFactory { */ public QueryInsightsReaderFactory(final Client client) { this.client = client; - this.Readers = new HashSet<>(); + this.readers = new HashMap<>(); } /** @@ -46,39 +46,49 @@ public QueryInsightsReaderFactory(final Client client) { * @param namedXContentRegistry for parsing purposes * @return QueryInsightsReader the created Reader */ - public QueryInsightsReader createReader(String indexPattern, NamedXContentRegistry namedXContentRegistry) { - QueryInsightsReader Reader = new LocalIndexReader( + public QueryInsightsReader createReader(String id, String indexPattern, NamedXContentRegistry namedXContentRegistry) { + QueryInsightsReader reader = new LocalIndexReader( client, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), - namedXContentRegistry + namedXContentRegistry, + id ); - this.Readers.add(Reader); - return Reader; + this.readers.put(id, reader); + return reader; } /** - * Update a Reader based on provided parameters + * Update a reader based on provided parameters * - * @param Reader The Reader to update - * @param indexPattern the index pattern if creating an index Reader - * @return QueryInsightsReader the updated Reader sink + * @param reader The reader to update + * @param indexPattern the index pattern if creating an index reader + * @return QueryInsightsReader the updated reader sink */ - public QueryInsightsReader updateReader(QueryInsightsReader Reader, String indexPattern) { - if (Reader.getClass() == LocalIndexReader.class) { - ((LocalIndexReader) Reader).setIndexPattern(DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT)); + public QueryInsightsReader updateReader(QueryInsightsReader reader, String indexPattern) { + if (reader.getClass() == LocalIndexReader.class) { + ((LocalIndexReader) reader).setIndexPattern(DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT)); } - return Reader; + return reader; + } + + /** + * Get a reader by id + * @param id The id of the reader + * @return QueryInsightsReader the Reader + */ + public QueryInsightsReader getReader(String id) { + return this.readers.get(id); } /** - * Close a Reader + * Close a reader * - * @param Reader the Reader to close + * @param reader the Reader to close */ - public void closeReader(QueryInsightsReader Reader) throws IOException { - if (Reader != null) { - Reader.close(); - this.Readers.remove(Reader); + public void closeReader(QueryInsightsReader reader) throws IOException { + if (reader != null) { + reader.close(); + this.readers.remove(reader.getId()); } } @@ -87,11 +97,11 @@ public void closeReader(QueryInsightsReader Reader) throws IOException { * */ public void closeAllReaders() { - for (QueryInsightsReader Reader : Readers) { + for (QueryInsightsReader reader : readers.values()) { try { - closeReader(Reader); + closeReader(reader); } catch (IOException e) { - logger.error("Fail to close query insights Reader, error: ", e); + logger.error("Fail to close query insights reader, error: ", e); } } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 808e4ff5..3d63e41f 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -8,10 +8,15 @@ package org.opensearch.plugin.insights.core.service; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_READER_ID; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.deleteSingleIndex; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TYPE; import java.io.IOException; import java.util.ArrayList; @@ -31,12 +36,15 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; +import org.opensearch.plugin.insights.core.exporter.SinkType; import org.opensearch.plugin.insights.core.metrics.OperationalMetric; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; +import org.opensearch.plugin.insights.core.reader.QueryInsightsReader; import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory; import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator; import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer; @@ -87,12 +95,12 @@ public class QueryInsightsService extends AbstractLifecycleComponent { protected volatile List scheduledFutures; /** - * Query Insights exporter factory + * Factory for validating and creating exporters */ final QueryInsightsExporterFactory queryInsightsExporterFactory; /** - * Query Insights reader factory + * Factory for validating and creating readers */ final QueryInsightsReaderFactory queryInsightsReaderFactory; @@ -112,6 +120,8 @@ public class QueryInsightsService extends AbstractLifecycleComponent { */ private QueryShapeGenerator queryShapeGenerator; + private final Client client; + /** * Constructor of the QueryInsightsService * @@ -136,6 +146,7 @@ public QueryInsightsService( this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client); this.queryInsightsReaderFactory = new QueryInsightsReaderFactory(client); this.namedXContentRegistry = namedXContentRegistry; + this.client = client; // initialize top n queries services and configurations consumers topQueriesServices = new HashMap<>(); for (MetricType metricType : MetricType.allMetricTypes()) { @@ -145,20 +156,18 @@ public QueryInsightsService( new TopQueriesService(client, metricType, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory) ); } - for (MetricType type : MetricType.allMetricTypes()) { - clusterService.getClusterSettings() - .addSettingsUpdateConsumer( - getExporterSettings(type), - (settings -> setExporterAndReader(type, settings, clusterService.state().metadata().indices())), - (settings -> validateExporterAndReaderConfig(type, settings)) - ); - clusterService.getClusterSettings() - .addSettingsUpdateConsumer( - TOP_N_EXPORTER_DELETE_AFTER, - (settings -> setExporterDeleteAfterAndDelete(type, settings)), - (TopQueriesService::validateExporterDeleteAfter) - ); - } + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_EXPORTER_TYPE, + (v -> setExporterAndReader(SinkType.parse(v), clusterService.state().metadata().indices())), + (this::validateExporterType) + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_EXPORTER_DELETE_AFTER, + (this::setExporterDeleteAfterAndDelete), + (TopQueriesService::validateExporterDeleteAfter) + ); this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry); this.enableSearchQueryMetricsFeature(false); @@ -407,28 +416,67 @@ public void setTopNSize(final MetricType type, final int topNSize) { /** * Set the exporter and reader config for a metricType * - * @param type {@link MetricType} - * @param settings exporter and reader settings + * @param sinkType {@link SinkType} + * @param indexMetadataMap index metadata map in the current cluster */ - private void setExporterAndReader(final MetricType type, final Settings settings, final Map indexMetadataMap) { - if (topQueriesServices.containsKey(type)) { - TopQueriesService tqs = topQueriesServices.get(type); - tqs.setExporter(settings, indexMetadataMap); - tqs.setReader(settings, namedXContentRegistry); + private void setExporterAndReader(final SinkType sinkType, final Map indexMetadataMap) { + final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID); + + // This method is invoked when sink type is changed + // Clear local indices if exporter is of type LocalIndexExporter + if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) { + deleteAllTopNIndices(client, indexMetadataMap); + } + + if (sinkType != null) { + if (topQueriesExporter != null && sinkType == SinkType.getSinkTypeFromExporter(topQueriesExporter)) { + // this won't happen since we disallowed users to change index patterns. + // But leaving the hook here since we will add support for more sinks and configurations in the future. + queryInsightsExporterFactory.updateExporter(topQueriesExporter, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN); + } else { + try { + queryInsightsExporterFactory.closeExporter(topQueriesExporter); + } catch (IOException e) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION); + logger.error("Fail to close the current exporter when updating exporter, error: ", e); + } + // this is a new exporter, create it for all underlying services. + queryInsightsExporterFactory.createExporter( + TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID, + sinkType, + DEFAULT_TOP_N_QUERIES_INDEX_PATTERN + ); + } + } else { + // Disable exporter if exporter type is set to null + try { + queryInsightsExporterFactory.closeExporter(topQueriesExporter); + } catch (IOException e) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION); + logger.error("Fail to close the current exporter when disabling exporter, error: ", e); + } } + + // set up reader for top n queries service + final QueryInsightsReader reader = queryInsightsReaderFactory.createReader( + TOP_QUERIES_LOCAL_INDEX_READER_ID, + DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, + namedXContentRegistry + ); + queryInsightsReaderFactory.updateReader(reader, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN); } /** * Set the exporter delete after, then delete expired Top N indices * - * @param type {@link MetricType} * @param deleteAfter the number of days after which Top N local indices should be deleted */ - private void setExporterDeleteAfterAndDelete(final MetricType type, final int deleteAfter) { - if (topQueriesServices.containsKey(type)) { - topQueriesServices.get(type).setExporterDeleteAfter(deleteAfter); - deleteExpiredTopNIndices(); + private void setExporterDeleteAfterAndDelete(final int deleteAfter) { + final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID); + if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) { + ((LocalIndexExporter) topQueriesExporter).setDeleteAfter(deleteAfter); } + deleteExpiredTopNIndices(); } /** @@ -440,16 +488,12 @@ public SearchQueryCategorizer getSearchQueryCategorizer() { } /** - * Validate the exporter and reader config for a metricType + * Validate the exporter type config * - * @param type {@link MetricType} - * @param settings exporter and reader settings + * @param exporterType exporter type */ - public void validateExporterAndReaderConfig(final MetricType type, final Settings settings) { - if (topQueriesServices.containsKey(type)) { - TopQueriesService tqs = topQueriesServices.get(type); - tqs.validateExporterAndReaderConfig(settings); - } + public void validateExporterType(final String exporterType) { + queryInsightsExporterFactory.validateExporterType(exporterType); } @Override @@ -519,11 +563,27 @@ public QueryInsightsHealthStats getHealthStats() { * Delete Top N local indices older than the configured data retention period */ private void deleteExpiredTopNIndices() { - for (MetricType metricType : MetricType.allMetricTypes()) { - topQueriesServices.get(metricType).deleteExpiredTopNIndices(clusterService.state().metadata().indices()); + final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID); + if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) { + threadPool.executor(QUERY_INSIGHTS_EXECUTOR) + .execute( + () -> ((LocalIndexExporter) topQueriesExporter).deleteExpiredTopNIndices(clusterService.state().metadata().indices()) + ); } } + /** + * Deletes all Top N local indices + * + * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata} + */ + void deleteAllTopNIndices(final Client client, final Map indexMetadataMap) { + indexMetadataMap.entrySet() + .stream() + .filter(entry -> isTopQueriesIndex(entry.getKey())) + .forEach(entry -> deleteSingleIndex(entry.getKey(), client)); + } + /** * Set query shape generator */ diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index b99767c3..f802213d 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -8,9 +8,6 @@ package org.opensearch.plugin.insights.core.service; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; @@ -40,15 +37,10 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.Client; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; -import org.opensearch.plugin.insights.core.exporter.SinkType; import org.opensearch.plugin.insights.core.metrics.OperationalMetric; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; import org.opensearch.plugin.insights.core.reader.QueryInsightsReader; @@ -70,6 +62,8 @@ * with high latency or resource usage */ public class TopQueriesService { + public static final String TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID = "top_queries_local_index_exporter"; + public static final String TOP_QUERIES_LOCAL_INDEX_READER_ID = "top_queries_local_index_reader"; private static final String METRIC_TYPE_TAG = "metric_type"; private static final String GROUPBY_TAG = "groupby"; @@ -122,12 +116,6 @@ public class TopQueriesService { */ private final ThreadPool threadPool; - /** - * Exporter for exporting top queries data - */ - private QueryInsightsExporter exporter; - private QueryInsightsReader reader; - private final QueryGrouper queryGrouper; TopQueriesService( @@ -135,7 +123,7 @@ public class TopQueriesService { final MetricType metricType, final ThreadPool threadPool, final QueryInsightsExporterFactory queryInsightsExporterFactory, - QueryInsightsReaderFactory queryInsightsReaderFactory + final QueryInsightsReaderFactory queryInsightsReaderFactory ) { this.enabled = false; this.client = client; @@ -146,8 +134,6 @@ public class TopQueriesService { this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE; this.windowStart = -1L; - this.exporter = null; - this.reader = null; topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>()); topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>()); @@ -269,66 +255,6 @@ public void validateWindowSize(final TimeValue windowSize) { } } - /** - * Set up the top queries exporter based on provided settings - * - * @param settings exporter config {@link Settings} - */ - public void setExporter(final Settings settings, final Map indexMetadataMap) { - // This method is invoked when sink type is changed - // Clear local indices if exporter is of type LocalIndexExporter - if (exporter != null && exporter.getClass() == LocalIndexExporter.class) { - deleteAllTopNIndices(indexMetadataMap); - } - - if (settings.get(EXPORTER_TYPE) != null) { - SinkType expectedType = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)); - if (exporter != null && expectedType == SinkType.getSinkTypeFromExporter(exporter)) { - queryInsightsExporterFactory.updateExporter(exporter, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN); - } else { - try { - queryInsightsExporterFactory.closeExporter(this.exporter); - } catch (IOException e) { - OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION); - logger.error("Fail to close the current exporter when updating exporter, error: ", e); - } - this.exporter = queryInsightsExporterFactory.createExporter( - SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)), - DEFAULT_TOP_N_QUERIES_INDEX_PATTERN - ); - } - } else { - // Disable exporter if exporter type is set to null - try { - queryInsightsExporterFactory.closeExporter(this.exporter); - this.exporter = null; - } catch (IOException e) { - OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION); - logger.error("Fail to close the current exporter when disabling exporter, error: ", e); - } - } - } - - /** - * Set up the top queries reader based on provided settings - * - * @param settings reader config {@link Settings} - * @param namedXContentRegistry NamedXContentRegistry for parsing purposes - */ - public void setReader(final Settings settings, final NamedXContentRegistry namedXContentRegistry) { - this.reader = queryInsightsReaderFactory.createReader(DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, namedXContentRegistry); - queryInsightsReaderFactory.updateReader(reader, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN); - } - - /** - * Validate provided settings for top queries exporter and reader - * - * @param settings settings exporter/reader config {@link Settings} - */ - public void validateExporterAndReaderConfig(Settings settings) { - queryInsightsExporterFactory.validateExporterConfig(settings); - } - /** * Lambda function to mark if a record is internal */ @@ -426,6 +352,7 @@ public List getTopQueriesRecordsFromIndex(final String from, } final List queries = new ArrayList<>(); + final QueryInsightsReader reader = queryInsightsReaderFactory.getReader(TOP_QUERIES_LOCAL_INDEX_READER_ID); if (reader != null) { try { final ZonedDateTime start = ZonedDateTime.parse(from); @@ -515,6 +442,7 @@ private void rotateWindowIfNecessary(final long newWindowStart) { topQueriesCurrentSnapshot.set(new ArrayList<>()); windowStart = newWindowStart; // export to the configured sink + QueryInsightsExporter exporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID); if (exporter != null) { threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> exporter.export(history)); } @@ -548,10 +476,7 @@ public List getTopQueriesCurrentSnapshot() { * Close the top n queries service * @throws IOException exception */ - public void close() throws IOException { - queryInsightsExporterFactory.closeExporter(this.exporter); - queryInsightsReaderFactory.closeReader(this.reader); - } + public void close() throws IOException {} /** * Drain internal stores. @@ -591,39 +516,6 @@ static void validateExporterDeleteAfter(final int deleteAfter) { } } - /** - * Set exporter delete after if exporter is a {@link LocalIndexExporter} - * - * @param deleteAfter the number of days after which Top N local indices should be deleted - */ - void setExporterDeleteAfter(final int deleteAfter) { - if (exporter != null && exporter.getClass() == LocalIndexExporter.class) { - ((LocalIndexExporter) exporter).setDeleteAfter(deleteAfter); - } - } - - /** - * Delete Top N local indices older than the configured data retention period - */ - void deleteExpiredTopNIndices(final Map indexMetadataMap) { - if (exporter != null && exporter.getClass() == LocalIndexExporter.class) { - threadPool.executor(QUERY_INSIGHTS_EXECUTOR) - .execute(() -> ((LocalIndexExporter) exporter).deleteExpiredTopNIndices(indexMetadataMap)); - } - } - - /** - * Deletes all Top N local indices - * - * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata} - */ - void deleteAllTopNIndices(final Map indexMetadataMap) { - indexMetadataMap.entrySet() - .stream() - .filter(entry -> isTopQueriesIndex(entry.getKey())) - .forEach(entry -> deleteSingleIndex(entry.getKey(), client)); - } - /** * Deletes the specified index and logs any failure that occurs during the operation. * diff --git a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index 204f5f33..ecff6a9b 100644 --- a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -13,7 +13,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.core.exporter.SinkType; import org.opensearch.plugin.insights.rules.model.GroupingType; @@ -222,15 +221,7 @@ public class QueryInsightsSettings { /** * Settings and defaults for top queries exporters */ - private static final String TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX = TOP_N_LATENCY_QUERIES_PREFIX + ".exporter."; - /** - * Prefix for top n queries by cpu exporters - */ - private static final String TOP_N_CPU_QUERIES_EXPORTER_PREFIX = TOP_N_CPU_QUERIES_PREFIX + ".exporter."; - /** - * Prefix for top n queries by memory exporters - */ - private static final String TOP_N_MEMORY_QUERIES_EXPORTER_PREFIX = TOP_N_MEMORY_QUERIES_PREFIX + ".exporter."; + private static final String TOP_N_QUERIES_EXPORTER_PREFIX = TOP_N_QUERIES_SETTING_PREFIX + ".exporter"; /** * Default index pattern of top n queries */ @@ -238,7 +229,7 @@ public class QueryInsightsSettings { /** * Default exporter type of top queries */ - public static final String DEFAULT_TOP_QUERIES_EXPORTER_TYPE = SinkType.LOCAL_INDEX.toString(); + public static final String DEFAULT_TOP_QUERIES_EXPORTER_TYPE = SinkType.NONE.toString(); /** * Default Top N local indices retention period in days */ @@ -259,37 +250,20 @@ public class QueryInsightsSettings { * and it applies to exporters of all metric types */ public static final Setting TOP_N_EXPORTER_DELETE_AFTER = Setting.intSetting( - TOP_N_QUERIES_SETTING_PREFIX + ".delete_after_days", + TOP_N_QUERIES_EXPORTER_PREFIX + ".delete_after_days", DEFAULT_DELETE_AFTER_VALUE, Setting.Property.Dynamic, Setting.Property.NodeScope ); /** - * Settings for the exporter of top latency queries - */ - public static final Setting TOP_N_LATENCY_EXPORTER_SETTINGS = Setting.groupSetting( - TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Settings for the exporter of top cpu queries - */ - public static final Setting TOP_N_CPU_EXPORTER_SETTINGS = Setting.groupSetting( - TOP_N_CPU_QUERIES_EXPORTER_PREFIX, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Settings for the exporter of top cpu queries + * Settings for the top n queries exporter */ - public static final Setting TOP_N_MEMORY_EXPORTER_SETTINGS = Setting.groupSetting( - TOP_N_MEMORY_QUERIES_EXPORTER_PREFIX, - Setting.Property.Dynamic, - Setting.Property.NodeScope + public static final Setting TOP_N_EXPORTER_TYPE = Setting.simpleString( + TOP_N_QUERIES_EXPORTER_PREFIX + ".type", + DEFAULT_TOP_QUERIES_EXPORTER_TYPE, + Setting.Property.NodeScope, + Setting.Property.Dynamic ); /** @@ -340,22 +314,6 @@ public static Setting getTopNWindowSizeSetting(MetricType type) { } } - /** - * Get the exporter settings based on type - * @param type MetricType - * @return exporter setting - */ - public static Setting getExporterSettings(MetricType type) { - switch (type) { - case CPU: - return TOP_N_CPU_EXPORTER_SETTINGS; - case MEMORY: - return TOP_N_MEMORY_EXPORTER_SETTINGS; - default: - return TOP_N_LATENCY_EXPORTER_SETTINGS; - } - } - /** * Default constructor */ diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java index 8d8dad0b..dacf64e7 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -68,21 +68,19 @@ public void testGetSettings() { QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE, QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY, QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE, QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING, QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER, + QueryInsightsSettings.TOP_N_EXPORTER_TYPE, QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY ), queryInsightsPlugin.getSettings() diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 886238f9..18f570e6 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -342,15 +342,13 @@ public static void registerAllQueryInsightsSettings(ClusterSettings clusterSetti clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_EXPORTER_TYPE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME); diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java index 7e5db95c..00ff6d34 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java @@ -54,7 +54,7 @@ public class LocalIndexExporterTests extends OpenSearchTestCase { @Before public void setup() { - localIndexExporter = new LocalIndexExporter(client, format); + localIndexExporter = new LocalIndexExporter(client, format, "id"); when(client.admin()).thenReturn(adminClient); when(adminClient.indices()).thenReturn(indicesAdminClient); diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java index 99c96280..c1b40030 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java @@ -11,13 +11,11 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; import java.time.format.DateTimeFormatter; import java.util.Locale; import org.junit.Before; import org.opensearch.client.Client; -import org.opensearch.common.settings.Settings; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; import org.opensearch.telemetry.metrics.Counter; import org.opensearch.telemetry.metrics.MetricsRegistry; @@ -44,28 +42,24 @@ public void setup() { } public void testValidateConfigWhenResetExporter() { - Settings.Builder settingsBuilder = Settings.builder(); - // empty settings - Settings settings = settingsBuilder.build(); try { - queryInsightsExporterFactory.validateExporterConfig(settings); + // empty settings + queryInsightsExporterFactory.validateExporterType(null); } catch (Exception e) { fail("No exception should be thrown when setting is null"); } } public void testInvalidExporterTypeConfig() { - Settings.Builder settingsBuilder = Settings.builder(); - Settings settings = settingsBuilder.put(EXPORTER_TYPE, "some_invalid_type").build(); - assertThrows(IllegalArgumentException.class, () -> { queryInsightsExporterFactory.validateExporterConfig(settings); }); + assertThrows(IllegalArgumentException.class, () -> { queryInsightsExporterFactory.validateExporterType("some_invalid_type"); }); } public void testCreateAndCloseExporter() { - QueryInsightsExporter exporter1 = queryInsightsExporterFactory.createExporter(SinkType.LOCAL_INDEX, format); + QueryInsightsExporter exporter1 = queryInsightsExporterFactory.createExporter("id", SinkType.LOCAL_INDEX, format); assertTrue(exporter1 instanceof LocalIndexExporter); - QueryInsightsExporter exporter2 = queryInsightsExporterFactory.createExporter(SinkType.DEBUG, format); + QueryInsightsExporter exporter2 = queryInsightsExporterFactory.createExporter("id", SinkType.DEBUG, format); assertTrue(exporter2 instanceof DebugExporter); - QueryInsightsExporter exporter3 = queryInsightsExporterFactory.createExporter(SinkType.DEBUG, format); + QueryInsightsExporter exporter3 = queryInsightsExporterFactory.createExporter("id", SinkType.DEBUG, format); assertTrue(exporter3 instanceof DebugExporter); try { queryInsightsExporterFactory.closeExporter(exporter1); @@ -77,7 +71,7 @@ public void testCreateAndCloseExporter() { } public void testUpdateExporter() { - LocalIndexExporter exporter = new LocalIndexExporter(client, DateTimeFormatter.ofPattern(format, Locale.ROOT)); + LocalIndexExporter exporter = new LocalIndexExporter(client, DateTimeFormatter.ofPattern(format, Locale.ROOT), "id"); queryInsightsExporterFactory.updateExporter(exporter, "yyyy-MM-dd-HH"); assertEquals(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH", Locale.ROOT).toString(), exporter.getIndexPattern().toString()); } diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterIT.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterIT.java index d44a5c4b..77fd39ef 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterIT.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterIT.java @@ -45,23 +45,18 @@ public void testQueryInsightsExporterSettings() throws IOException { private String defaultExporterSettings() { return "{\n" + " \"persistent\" : {\n" - + " \"search.insights.top_queries.latency.exporter.config.index\" : \"YYYY.MM.dd\",\n" - + " \"search.insights.top_queries.latency.exporter.type\" : \"local_index\"\n" + + " \"search.insights.top_queries.exporter.type\" : \"local_index\"\n" + " }\n" + "}"; } private String[] invalidExporterSettings() { return new String[] { + "{\n" + " \"persistent\" : {\n" + " \"search.insights.top_queries.exporter.type\" : invalid_type\n" + " }\n" + "}", "{\n" + " \"persistent\" : {\n" - + " \"search.insights.top_queries.latency.exporter.type\" : invalid_type\n" - + " }\n" - + "}", - "{\n" - + " \"persistent\" : {\n" - + " \"search.insights.top_queries.latency.exporter.type\" : local_index,\n" - + " \"search.insights.top_queries.latency.exporter.config.index\" : \"1a2b\"\n" + + " \"search.insights.top_queries.exporter.type\" : local_index,\n" + + " \"search.insights.top_queries.exporter.config.index\" : \"1a2b\"\n" + " }\n" + "}" }; } diff --git a/src/test/java/org/opensearch/plugin/insights/core/reader/LocalIndexReaderTests.java b/src/test/java/org/opensearch/plugin/insights/core/reader/LocalIndexReaderTests.java index 63a9ed0a..f9332edd 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/reader/LocalIndexReaderTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/reader/LocalIndexReaderTests.java @@ -48,7 +48,7 @@ public class LocalIndexReaderTests extends OpenSearchTestCase { @Before public void setup() { - localIndexReader = new LocalIndexReader(client, format, namedXContentRegistry); + localIndexReader = new LocalIndexReader(client, format, namedXContentRegistry, "id"); } @SuppressWarnings("unchecked") diff --git a/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java b/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java index 74adcac2..2a3537eb 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java @@ -45,7 +45,7 @@ public void setup() { } public void testCreateAndCloseReader() { - QueryInsightsReader reader1 = queryInsightsReaderFactory.createReader(format, namedXContentRegistry); + QueryInsightsReader reader1 = queryInsightsReaderFactory.createReader("id", format, namedXContentRegistry); assertTrue(reader1 instanceof LocalIndexReader); try { queryInsightsReaderFactory.closeReader(reader1); @@ -56,7 +56,12 @@ public void testCreateAndCloseReader() { } public void testUpdateReader() { - LocalIndexReader reader = new LocalIndexReader(client, DateTimeFormatter.ofPattern(format, Locale.ROOT), namedXContentRegistry); + LocalIndexReader reader = new LocalIndexReader( + client, + DateTimeFormatter.ofPattern(format, Locale.ROOT), + namedXContentRegistry, + "id" + ); queryInsightsReaderFactory.updateReader(reader, "yyyy-MM-dd-HH"); assertEquals(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH", Locale.ROOT).toString(), reader.getIndexPattern().toString()); } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index 61631d43..ba59bec2 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -11,18 +11,29 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.ENTRY_COUNT; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.EVICTIONS; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HIT_COUNT; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.MISS_COUNT; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.SIZE_IN_BYTES; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.client.AdminClient; import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -54,6 +65,8 @@ public class QueryInsightsServiceTests extends OpenSearchTestCase { private final NamedXContentRegistry namedXContentRegistry = mock(NamedXContentRegistry.class); private QueryInsightsService queryInsightsService; private QueryInsightsService queryInsightsServiceSpy; + private final AdminClient adminClient = mock(AdminClient.class); + private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); @Before public void setup() { @@ -84,6 +97,9 @@ public void setup() { invocation -> mock(Counter.class) ); OperationalMetricsCounter.initialize("cluster", metricsRegistry); + + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesAdminClient); } @Override @@ -224,4 +240,46 @@ public void testGetHealthStats() { assertTrue(fieldTypeCacheStats.containsKey(HIT_COUNT)); assertTrue(fieldTypeCacheStats.containsKey(MISS_COUNT)); } + + public void testDeleteAllTopNIndices() { + // Create 9 top_queries-* indices + Map indexMetadataMap = new HashMap<>(); + for (int i = 1; i < 10; i++) { + String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); + long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .build(); + indexMetadataMap.put(indexName, indexMetadata); + } + // Create 5 user indices + for (int i = 0; i < 5; i++) { + String indexName = "my_index-" + i; + long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .build(); + indexMetadataMap.put(indexName, indexMetadata); + } + + queryInsightsService.deleteAllTopNIndices(client, indexMetadataMap); + // All 10 indices should be deleted + verify(client, times(9)).admin(); + verify(adminClient, times(9)).indices(); + verify(indicesAdminClient, times(9)).delete(any(), any()); + } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index 69bc3461..27d2abc9 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -10,29 +10,18 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; -import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash; import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex; import static org.opensearch.plugin.insights.core.service.TopQueriesService.validateExporterDeleteAfter; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.junit.Before; -import org.opensearch.Version; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.client.IndicesAdminClient; import org.opensearch.cluster.coordination.DeterministicTaskQueue; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; @@ -222,48 +211,6 @@ public void testValidateExporterDeleteAfter() { ); } - public void testDeleteAllTopNIndices() { - // Create 9 top_queries-* indices - Map indexMetadataMap = new HashMap<>(); - for (int i = 1; i < 10; i++) { - String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); - long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); - - IndexMetadata indexMetadata = IndexMetadata.builder(indexName) - .settings( - Settings.builder() - .put("index.version.created", Version.CURRENT.id) - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 1) - .put(SETTING_CREATION_DATE, creationTime) - ) - .build(); - indexMetadataMap.put(indexName, indexMetadata); - } - // Create 5 user indices - for (int i = 0; i < 5; i++) { - String indexName = "my_index-" + i; - long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); - - IndexMetadata indexMetadata = IndexMetadata.builder(indexName) - .settings( - Settings.builder() - .put("index.version.created", Version.CURRENT.id) - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 1) - .put(SETTING_CREATION_DATE, creationTime) - ) - .build(); - indexMetadataMap.put(indexName, indexMetadata); - } - - topQueriesService.deleteAllTopNIndices(indexMetadataMap); - // All 10 indices should be delete - verify(client, times(9)).admin(); - verify(adminClient, times(9)).indices(); - verify(indicesAdminClient, times(9)).delete(any(), any()); - } - public void testIsTopQueriesIndex() { assertTrue(isTopQueriesIndex("top_queries-2024.01.01-01234")); assertTrue(isTopQueriesIndex("top_queries-2025.12.12-99999")); From ab95ebfd664b27b2f27895d41c4553b02b2d4110 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Thu, 23 Jan 2025 18:17:59 -0800 Subject: [PATCH 2/7] use index mapping and define default index metadata for top n queries Signed-off-by: Chenyang Ji --- .../core/exporter/LocalIndexExporter.java | 125 ++++++++++++++--- .../QueryInsightsExporterFactory.java | 14 +- .../core/service/QueryInsightsService.java | 3 +- .../mappings/top-queries-record.json | 127 ++++++++++++++++++ 4 files changed, 249 insertions(+), 20 deletions(-) create mode 100644 src/main/resources/mappings/top-queries-record.json diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java index cdc462fd..76452653 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java @@ -11,6 +11,8 @@ import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE; +import java.io.IOException; +import java.nio.charset.Charset; import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; @@ -18,14 +20,21 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.bulk.BulkRequestBuilder; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; @@ -44,19 +53,38 @@ public final class LocalIndexExporter implements QueryInsightsExporter { */ private final Logger logger = LogManager.getLogger(); private final Client client; + private final ClusterService clusterService; + private final String indexMapping; private DateTimeFormatter indexPattern; private int deleteAfter; private final String id; + private static final int DEFAULT_NUMBER_OF_REPLICA = 1; + private static final int DEFAULT_NUMBER_OF_SHARDS = 1; + private static final List DEFAULT_SORTED_FIELDS = List.of( + "measurements.latency.number", + "measurements.cpu.number", + "measurements.memory.number" + ); + private static final List DEFAULT_SORTED_ORDERS = List.of( + "desc", + "desc", + "desc" + ); /** * Constructor of LocalIndexExporter * * @param client OS client + * @param clusterService cluster service * @param indexPattern the pattern of index to export to + * @param indexMapping the index mapping file + * @param id id of the exporter */ - public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern, final String id) { + public LocalIndexExporter(final Client client, final ClusterService clusterService, final DateTimeFormatter indexPattern, final String indexMapping, final String id) { this.indexPattern = indexPattern; this.client = client; + this.clusterService = clusterService; + this.indexMapping = indexMapping; this.deleteAfter = DEFAULT_DELETE_AFTER_VALUE; this.id = id; } @@ -96,28 +124,72 @@ public void export(final List records) { } try { final String indexName = buildLocalIndexName(); - final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1)); - for (SearchQueryRecord record : records) { - bulkRequestBuilder.add( - new IndexRequest(indexName).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + if (!checkIndexExists(indexName)) { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); + + createIndexRequest.settings(Settings.builder() + .putList("index.sort.field", DEFAULT_SORTED_FIELDS) + .putList("index.sort.order", DEFAULT_SORTED_ORDERS) + .put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS) + .put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA) ); + createIndexRequest.mapping(readIndexMappings()); + + client.admin().indices().create(createIndexRequest, new ActionListener<>() { + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + if (createIndexResponse.isAcknowledged()) { + try { + bulk(indexName, records); + } catch (IOException e) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS); + logger.error("Unable to index query insights data: ", e); + } + } + } + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + try { + bulk(indexName, records); + } catch (IOException ex) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS); + logger.error("Unable to index query insights data: ", e); + } + } else { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS); + logger.error("Unable to create query insights index: ", e); + } + } + }); + } else { + bulk(indexName, records); } - bulkRequestBuilder.execute(new ActionListener() { - @Override - public void onResponse(BulkResponse bulkItemResponses) {} - - @Override - public void onFailure(Exception e) { - OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_BULK_FAILURES); - logger.error("Failed to execute bulk operation for query insights data: ", e); - } - }); } catch (final Exception e) { OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS); logger.error("Unable to index query insights data: ", e); } } + private void bulk(final String indexName, final List records) throws IOException { + final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1)); + for (SearchQueryRecord record : records) { + bulkRequestBuilder.add( + new IndexRequest(indexName).id(record.getId()).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + bulkRequestBuilder.execute(new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) {} + + @Override + public void onFailure(Exception e) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_BULK_FAILURES); + logger.error("Failed to execute bulk operation for query insights data: ", e); + } + }); + } + /** * Close the exporter sink */ @@ -174,4 +246,27 @@ public static String generateLocalIndexDateHash() { // Generate a 5-digit numeric hash from the date's hashCode return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000); } + + /** + * check if index exists + * @return boolean + */ + private boolean checkIndexExists(String indexName) { + ClusterState clusterState = clusterService.state(); + return clusterState.getRoutingTable().hasIndex(indexName); + } + + /** + * get correlation rule index mappings + * @return mappings of correlation rule index + * @throws IOException IOException + */ + private String readIndexMappings() throws IOException { + return new String( + Objects.requireNonNull(LocalIndexExporter.class.getClassLoader().getResourceAsStream(indexMapping)) + .readAllBytes(), + Charset.defaultCharset() + ); + } + } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java index 4c4a6cae..8146196a 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java @@ -16,6 +16,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.plugin.insights.core.metrics.OperationalMetric; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; @@ -28,15 +29,18 @@ public class QueryInsightsExporterFactory { */ private final Logger logger = LogManager.getLogger(); final private Client client; + final private ClusterService clusterService; final private Map exporters; /** * Constructor of QueryInsightsExporterFactory * * @param client OS client + * @param clusterService cluster service */ - public QueryInsightsExporterFactory(final Client client) { + public QueryInsightsExporterFactory(final Client client, final ClusterService clusterService) { this.client = client; + this.clusterService = clusterService; this.exporters = new HashMap<>(); } @@ -64,13 +68,15 @@ public void validateExporterType(final String exporterType) throws IllegalArgume /** * Create an exporter based on provided parameters * + * @param id id of the exporter * @param type The type of exporter to create - * @param indexPattern the index pattern if creating a index exporter + * @param indexPattern the index pattern if creating an index exporter + * @param indexMapping index mapping file * @return QueryInsightsExporter the created exporter sink */ - public QueryInsightsExporter createExporter(String id, SinkType type, String indexPattern) { + public QueryInsightsExporter createExporter(String id, SinkType type, String indexPattern, String indexMapping) { if (SinkType.LOCAL_INDEX.equals(type)) { - QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), id); + QueryInsightsExporter exporter = new LocalIndexExporter(client, clusterService, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), indexMapping, id); this.exporters.put(id, exporter); return exporter; } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 3d63e41f..5942f778 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -444,7 +444,8 @@ private void setExporterAndReader(final SinkType sinkType, final Map Date: Thu, 23 Jan 2025 18:28:40 -0800 Subject: [PATCH 3/7] improve safegard for deleting top queries indices with index mapping Signed-off-by: Chenyang Ji --- .../core/exporter/LocalIndexExporter.java | 34 ++++++---- .../QueryInsightsExporterFactory.java | 8 ++- .../core/reader/LocalIndexReader.java | 2 +- .../core/service/QueryInsightsService.java | 5 +- .../core/service/TopQueriesService.java | 68 +++++++++++++------ 5 files changed, 76 insertions(+), 41 deletions(-) diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java index 76452653..b1c13699 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java @@ -65,11 +65,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter { "measurements.cpu.number", "measurements.memory.number" ); - private static final List DEFAULT_SORTED_ORDERS = List.of( - "desc", - "desc", - "desc" - ); + private static final List DEFAULT_SORTED_ORDERS = List.of("desc", "desc", "desc"); /** * Constructor of LocalIndexExporter @@ -80,7 +76,13 @@ public final class LocalIndexExporter implements QueryInsightsExporter { * @param indexMapping the index mapping file * @param id id of the exporter */ - public LocalIndexExporter(final Client client, final ClusterService clusterService, final DateTimeFormatter indexPattern, final String indexMapping, final String id) { + public LocalIndexExporter( + final Client client, + final ClusterService clusterService, + final DateTimeFormatter indexPattern, + final String indexMapping, + final String id + ) { this.indexPattern = indexPattern; this.client = client; this.clusterService = clusterService; @@ -127,11 +129,12 @@ public void export(final List records) { if (!checkIndexExists(indexName)) { CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); - createIndexRequest.settings(Settings.builder() - .putList("index.sort.field", DEFAULT_SORTED_FIELDS) - .putList("index.sort.order", DEFAULT_SORTED_ORDERS) - .put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS) - .put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA) + createIndexRequest.settings( + Settings.builder() + .putList("index.sort.field", DEFAULT_SORTED_FIELDS) + .putList("index.sort.order", DEFAULT_SORTED_ORDERS) + .put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS) + .put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA) ); createIndexRequest.mapping(readIndexMappings()); @@ -147,6 +150,7 @@ public void onResponse(CreateIndexResponse createIndexResponse) { } } } + @Override public void onFailure(Exception e) { if (e instanceof ResourceAlreadyExistsException) { @@ -175,7 +179,8 @@ private void bulk(final String indexName, final List records) final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1)); for (SearchQueryRecord record : records) { bulkRequestBuilder.add( - new IndexRequest(indexName).id(record.getId()).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + new IndexRequest(indexName).id(record.getId()) + .source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) ); } bulkRequestBuilder.execute(new ActionListener() { @@ -225,7 +230,7 @@ public void deleteExpiredTopNIndices(final Map indexMetad long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter); for (Map.Entry entry : indexMetadataMap.entrySet()) { String indexName = entry.getKey(); - if (isTopQueriesIndex(indexName) && entry.getValue().getCreationDate() <= expirationMillisLong) { + if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) { // delete this index TopQueriesService.deleteSingleIndex(indexName, client); } @@ -263,8 +268,7 @@ private boolean checkIndexExists(String indexName) { */ private String readIndexMappings() throws IOException { return new String( - Objects.requireNonNull(LocalIndexExporter.class.getClassLoader().getResourceAsStream(indexMapping)) - .readAllBytes(), + Objects.requireNonNull(LocalIndexExporter.class.getClassLoader().getResourceAsStream(indexMapping)).readAllBytes(), Charset.defaultCharset() ); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java index 8146196a..e18f44a6 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java @@ -76,7 +76,13 @@ public void validateExporterType(final String exporterType) throws IllegalArgume */ public QueryInsightsExporter createExporter(String id, SinkType type, String indexPattern, String indexMapping) { if (SinkType.LOCAL_INDEX.equals(type)) { - QueryInsightsExporter exporter = new LocalIndexExporter(client, clusterService, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), indexMapping, id); + QueryInsightsExporter exporter = new LocalIndexExporter( + client, + clusterService, + DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), + indexMapping, + id + ); this.exporters.put(id, exporter); return exporter; } diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java index 7abc9b7a..69996ca4 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java @@ -116,7 +116,7 @@ public List read(final String from, final String to, String i while (curr.isBefore(end.plusDays(1).toLocalDate().atStartOfDay(end.getZone()))) { String indexName = buildLocalIndexName(curr); SearchRequest searchRequest = new SearchRequest(indexName); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(1000); MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*"); RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp") .from(start.toInstant().toEpochMilli()) diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 5942f778..b70af99a 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -63,6 +63,7 @@ * information related to search queries */ public class QueryInsightsService extends AbstractLifecycleComponent { + public static final String QUERY_INSIGHTS_INDEX_TAG_NAME = "query_insights_feature_space"; private static final Logger logger = LogManager.getLogger(QueryInsightsService.class); @@ -143,7 +144,7 @@ public QueryInsightsService( enableCollect = new HashMap<>(); queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); this.threadPool = threadPool; - this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client); + this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client, clusterService); this.queryInsightsReaderFactory = new QueryInsightsReaderFactory(client); this.namedXContentRegistry = namedXContentRegistry; this.client = client; @@ -581,7 +582,7 @@ private void deleteExpiredTopNIndices() { void deleteAllTopNIndices(final Client client, final Map indexMetadataMap) { indexMetadataMap.entrySet() .stream() - .filter(entry -> isTopQueriesIndex(entry.getKey())) + .filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue())) .forEach(entry -> deleteSingleIndex(entry.getKey(), client)); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index f802213d..880d8f07 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -8,6 +8,7 @@ package org.opensearch.plugin.insights.core.service; +import static org.opensearch.plugin.insights.core.service.QueryInsightsService.QUERY_INSIGHTS_INDEX_TAG_NAME; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -37,6 +39,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; @@ -64,6 +67,7 @@ public class TopQueriesService { public static final String TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID = "top_queries_local_index_exporter"; public static final String TOP_QUERIES_LOCAL_INDEX_READER_ID = "top_queries_local_index_reader"; + public static final String TOP_QUERIES_INDEX_TAG_VALUE = "top_n_queries"; private static final String METRIC_TYPE_TAG = "metric_type"; private static final String GROUPBY_TAG = "groupby"; @@ -538,35 +542,55 @@ public void onFailure(Exception e) { } /** - * Validates if the input string is a Query Insights local index name - * in the format "top_queries-YYYY.MM.dd-XXXXX". + * Validates if the input string is a Query Insights local index + * in the format "top_queries-YYYY.MM.dd-XXXXX", and has the expected index metadata. * - * @param indexName the string to validate. + * @param indexName the index name to validate. + * @param indexMetadata the metadata associated with the index * @return {@code true} if the string is valid, {@code false} otherwise. */ - public static boolean isTopQueriesIndex(String indexName) { - // Split the input string by '-' - String[] parts = indexName.split("-"); + public static boolean isTopQueriesIndex(String indexName, IndexMetadata indexMetadata) { + try { + if (indexMetadata == null || indexMetadata.mapping() == null) { + return false; + } + Map sourceMap = Objects.requireNonNull(indexMetadata.mapping()).getSourceAsMap(); + if (sourceMap == null || !sourceMap.containsKey("_meta")) { + return false; + } + Map metaMap = (Map) sourceMap.get("_meta"); + if (metaMap == null || !metaMap.containsKey(QUERY_INSIGHTS_INDEX_TAG_NAME)) { + return false; + } + if (!metaMap.get(QUERY_INSIGHTS_INDEX_TAG_NAME).equals(TOP_QUERIES_INDEX_TAG_VALUE)) { + return false; + } - // Check if the string has exactly 3 parts - if (parts.length != 3) { - return false; - } + // Split the input string by '-' + String[] parts = indexName.split("-"); - // Validate the first part is "top_queries" - if (!"top_queries".equals(parts[0])) { - return false; - } + // Check if the string has exactly 3 parts + if (parts.length != 3) { + return false; + } - // Validate the second part is a valid date in "YYYY.MM.dd" format - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd", Locale.ROOT); - try { - LocalDate.parse(parts[1], formatter); - } catch (DateTimeParseException e) { + // Validate the first part is "top_queries" + if (!"top_queries".equals(parts[0])) { + return false; + } + + // Validate the second part is a valid date in "YYYY.MM.dd" format + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd", Locale.ROOT); + try { + LocalDate.parse(parts[1], formatter); + } catch (DateTimeParseException e) { + return false; + } + + // Validate the third part is exactly 5 digits + return parts[2].matches("\\d{5}"); + } catch (Exception e) { return false; } - - // Validate the third part is exactly 5 digits - return parts[2].matches("\\d{5}"); } } From cd14db4695f633a07706fd91b789ab06c6de7cfd Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Thu, 23 Jan 2025 21:06:22 -0800 Subject: [PATCH 4/7] fix unit tests Signed-off-by: Chenyang Ji --- .../insights/core/exporter/DebugExporter.java | 2 +- .../core/exporter/LocalIndexExporter.java | 61 +++---- .../core/reader/LocalIndexReader.java | 2 +- .../core/service/QueryInsightsService.java | 29 +++- .../core/service/TopQueriesService.java | 23 --- .../core/utils/ExporterReaderUtils.java | 38 +++++ .../insights/core/utils/package-info.java | 12 ++ .../exporter/LocalIndexExporterTests.java | 142 ++++++++++++---- .../QueryInsightsExporterFactoryTests.java | 27 ++- .../service/QueryInsightsServiceTests.java | 93 +++++++++- .../core/service/TopQueriesServiceTests.java | 159 ++++++++++++++++-- 11 files changed, 468 insertions(+), 120 deletions(-) create mode 100644 src/main/java/org/opensearch/plugin/insights/core/utils/ExporterReaderUtils.java create mode 100644 src/main/java/org/opensearch/plugin/insights/core/utils/package-info.java diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java index 2fbfa5b9..2ad34bbc 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java @@ -16,7 +16,7 @@ /** * Debug exporter for development purpose */ -public final class DebugExporter implements QueryInsightsExporter { +public class DebugExporter implements QueryInsightsExporter { /** * Logger of the debug exporter */ diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java index b1c13699..73daf30b 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java @@ -8,46 +8,43 @@ package org.opensearch.plugin.insights.core.exporter; -import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex; +import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE; import java.io.IOException; import java.nio.charset.Charset; -import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.List; -import java.util.Locale; -import java.util.Map; import java.util.Objects; -import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.bulk.BulkRequestBuilder; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.plugin.insights.core.metrics.OperationalMetric; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; -import org.opensearch.plugin.insights.core.service.TopQueriesService; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; /** * Local index exporter for exporting query insights data to local OpenSearch indices. */ -public final class LocalIndexExporter implements QueryInsightsExporter { +public class LocalIndexExporter implements QueryInsightsExporter { /** * Logger of the local index exporter */ @@ -110,7 +107,7 @@ public DateTimeFormatter getIndexPattern() { * * @param indexPattern index pattern */ - void setIndexPattern(DateTimeFormatter indexPattern) { + public void setIndexPattern(DateTimeFormatter indexPattern) { this.indexPattern = indexPattern; } @@ -153,7 +150,8 @@ public void onResponse(CreateIndexResponse createIndexResponse) { @Override public void onFailure(Exception e) { - if (e instanceof ResourceAlreadyExistsException) { + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof ResourceAlreadyExistsException) { try { bulk(indexName, records); } catch (IOException ex) { @@ -222,34 +220,37 @@ public void setDeleteAfter(final int deleteAfter) { } /** - * Delete Top N local indices older than the configured data retention period + * Get local index exporter data retention period * - * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata} + * @return the number of days after which Top N local indices should be deleted */ - public void deleteExpiredTopNIndices(final Map indexMetadataMap) { - long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter); - for (Map.Entry entry : indexMetadataMap.entrySet()) { - String indexName = entry.getKey(); - if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) { - // delete this index - TopQueriesService.deleteSingleIndex(indexName, client); - } - } + public int getDeleteAfter() { + return deleteAfter; } /** - * Generates a consistent 5-digit numeric hash based on the current UTC date. - * The generated hash is deterministic, meaning it will return the same result for the same date. + * Deletes the specified index and logs any failure that occurs during the operation. * - * @return A 5-digit numeric string representation of the current date's hash. + * @param indexName The name of the index to delete. + * @param client The OpenSearch client used to perform the deletion. */ - public static String generateLocalIndexDateHash() { - // Get the current date in UTC (yyyy-MM-dd format) - String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT) - .format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate()); + public void deleteSingleIndex(String indexName, Client client) { + Logger logger = LogManager.getLogger(); + client.admin().indices().delete(new DeleteIndexRequest(indexName), new ActionListener<>() { + @Override + // CS-SUPPRESS-SINGLE: RegexpSingleline It is not possible to use phrase "cluster manager" instead of master here + public void onResponse(org.opensearch.action.support.master.AcknowledgedResponse acknowledgedResponse) {} - // Generate a 5-digit numeric hash from the date's hashCode - return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000); + @Override + public void onFailure(Exception e) { + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof IndexNotFoundException) { + return; + } + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_DELETE_FAILURES); + logger.error("Failed to delete index '{}': ", indexName, e); + } + }); } /** diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java index 69996ca4..f60f7d24 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java @@ -8,7 +8,7 @@ package org.opensearch.plugin.insights.core.reader; -import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash; +import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash; import java.time.ZoneOffset; import java.time.ZonedDateTime; diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index b70af99a..cde9df71 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -10,7 +10,6 @@ import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID; import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_READER_ID; -import static org.opensearch.plugin.insights.core.service.TopQueriesService.deleteSingleIndex; import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; @@ -426,7 +425,7 @@ private void setExporterAndReader(final SinkType sinkType, final Map ((LocalIndexExporter) topQueriesExporter).deleteExpiredTopNIndices(clusterService.state().metadata().indices()) + final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter; + threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> { + final Map indexMetadataMap = clusterService.state().metadata().indices(); + long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis( + ((LocalIndexExporter) topQueriesExporter).getDeleteAfter() ); + for (Map.Entry entry : indexMetadataMap.entrySet()) { + String indexName = entry.getKey(); + if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) { + // delete this index + localIndexExporter.deleteSingleIndex(indexName, client); + } + } + }); } } @@ -579,11 +588,15 @@ private void deleteExpiredTopNIndices() { * * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata} */ - void deleteAllTopNIndices(final Client client, final Map indexMetadataMap) { + void deleteAllTopNIndices( + final Client client, + final Map indexMetadataMap, + final LocalIndexExporter localIndexExporter + ) { indexMetadataMap.entrySet() .stream() .filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue())) - .forEach(entry -> deleteSingleIndex(entry.getKey(), client)); + .forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client)); } /** diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index 880d8f07..e4cbed12 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -37,11 +37,9 @@ import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.unit.TimeValue; -import org.opensearch.core.action.ActionListener; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.core.metrics.OperationalMetric; @@ -520,27 +518,6 @@ static void validateExporterDeleteAfter(final int deleteAfter) { } } - /** - * Deletes the specified index and logs any failure that occurs during the operation. - * - * @param indexName The name of the index to delete. - * @param client The OpenSearch client used to perform the deletion. - */ - public static void deleteSingleIndex(String indexName, Client client) { - Logger logger = LogManager.getLogger(); - client.admin().indices().delete(new DeleteIndexRequest(indexName), new ActionListener<>() { - @Override - // CS-SUPPRESS-SINGLE: RegexpSingleline It is not possible to use phrase "cluster manager" instead of master here - public void onResponse(org.opensearch.action.support.master.AcknowledgedResponse acknowledgedResponse) {} - - @Override - public void onFailure(Exception e) { - OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_DELETE_FAILURES); - logger.error("Failed to delete index '{}': ", indexName, e); - } - }); - } - /** * Validates if the input string is a Query Insights local index * in the format "top_queries-YYYY.MM.dd-XXXXX", and has the expected index metadata. diff --git a/src/main/java/org/opensearch/plugin/insights/core/utils/ExporterReaderUtils.java b/src/main/java/org/opensearch/plugin/insights/core/utils/ExporterReaderUtils.java new file mode 100644 index 00000000..2b75042d --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/core/utils/ExporterReaderUtils.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.utils; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Locale; + +/** + * Util functions for exporter and reader + * + */ +public class ExporterReaderUtils { + + private ExporterReaderUtils() {} + + /** + * Generates a consistent 5-digit numeric hash based on the current UTC date. + * The generated hash is deterministic, meaning it will return the same result for the same date. + * + * @return A 5-digit numeric string representation of the current date's hash. + */ + public static String generateLocalIndexDateHash() { + // Get the current date in UTC (yyyy-MM-dd format) + String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT) + .format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate()); + + // Generate a 5-digit numeric hash from the date's hashCode + return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000); + } +} diff --git a/src/main/java/org/opensearch/plugin/insights/core/utils/package-info.java b/src/main/java/org/opensearch/plugin/insights/core/utils/package-info.java new file mode 100644 index 00000000..99c76e22 --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/core/utils/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Util functions + */ +package org.opensearch.plugin.insights.core.utils; diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java index 00ff6d34..2a66bb08 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java @@ -16,31 +16,41 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; -import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; +import static org.opensearch.plugin.insights.core.service.QueryInsightsService.QUERY_INSIGHTS_INDEX_TAG_NAME; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_INDEX_TAG_VALUE; +import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash; -import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.time.temporal.ChronoUnit; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.opensearch.Version; import org.opensearch.action.bulk.BulkAction; import org.opensearch.action.bulk.BulkRequestBuilder; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.io.IOUtils; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; /** * Granular tests for the {@link LocalIndexExporterTests} class. @@ -51,15 +61,48 @@ public class LocalIndexExporterTests extends OpenSearchTestCase { private final AdminClient adminClient = mock(AdminClient.class); private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); private LocalIndexExporter localIndexExporter; + private final ThreadPool threadPool = new TestThreadPool("QueryInsightsThreadPool"); + private String indexName; + private ClusterService clusterService; @Before public void setup() { - localIndexExporter = new LocalIndexExporter(client, format, "id"); + indexName = format.format(ZonedDateTime.now(ZoneOffset.UTC)) + "-" + generateLocalIndexDateHash(); + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary(indexName, true, 1 + randomInt(3), randomInt(2)); + clusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings); + + RoutingTable.Builder routingTable = RoutingTable.builder(state.routingTable()); + routingTable.addAsRecovery( + IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + ) + .putMapping( + new MappingMetadata("_doc", Map.of("_meta", Map.of(QUERY_INSIGHTS_INDEX_TAG_NAME, TOP_QUERIES_INDEX_TAG_VALUE))) + ) + .build() + ); + ClusterState updatedState = ClusterState.builder(state).routingTable(routingTable.build()).build(); + ClusterServiceUtils.setState(clusterService, updatedState); + localIndexExporter = new LocalIndexExporter(client, clusterService, format, "", "id"); when(client.admin()).thenReturn(adminClient); when(adminClient.indices()).thenReturn(indicesAdminClient); } + @Override + public void tearDown() throws Exception { + super.tearDown(); + IOUtils.close(clusterService); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + public void testExportEmptyRecords() { List records = List.of(); try { @@ -70,7 +113,7 @@ public void testExportEmptyRecords() { } @SuppressWarnings("unchecked") - public void testExportRecords() { + public void testExportRecordsWhenIndexExists() { BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(client, BulkAction.INSTANCE)); final PlainActionFuture future = mock(PlainActionFuture.class); when(future.actionGet()).thenReturn(null); @@ -86,6 +129,33 @@ public void testExportRecords() { assertEquals(2, bulkRequestBuilder.numberOfActions()); } + public void testExportRecordsWhenIndexNotExist() { + RoutingTable.Builder routingTable = RoutingTable.builder(); + routingTable.addAsRecovery( + IndexMetadata.builder("another_index") + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + ) + .putMapping( + new MappingMetadata("_doc", Map.of("_meta", Map.of(QUERY_INSIGHTS_INDEX_TAG_NAME, TOP_QUERIES_INDEX_TAG_VALUE))) + ) + .build() + ); + ClusterState updatedState = ClusterState.builder(clusterService.state()).routingTable(routingTable.build()).build(); + ClusterServiceUtils.setState(clusterService, updatedState); + + List records = QueryInsightsTestUtils.generateQueryInsightRecords(2); + try { + localIndexExporter.export(records); + } catch (Exception e) { + fail("No exception should be thrown when exporting query insights data"); + } + verify(indicesAdminClient, times(1)).create(any(), any()); + } + @SuppressWarnings("unchecked") public void testExportRecordsWithError() { BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(client, BulkAction.INSTANCE)); @@ -116,32 +186,32 @@ public void testGetAndSetIndexPattern() { assert (localIndexExporter.getIndexPattern() == newFormatter); } - public void testDeleteExpiredTopNIndices() { - // Reset exporter index pattern to default - localIndexExporter.setIndexPattern(DateTimeFormatter.ofPattern(DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, Locale.ROOT)); - - // Create 9 top_queries-* indices - Map indexMetadataMap = new HashMap<>(); - for (int i = 1; i < 10; i++) { - String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); - long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); - - IndexMetadata indexMetadata = IndexMetadata.builder(indexName) - .settings( - Settings.builder() - .put("index.version.created", Version.CURRENT.id) - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 1) - .put(SETTING_CREATION_DATE, creationTime) - ) - .build(); - indexMetadataMap.put(indexName, indexMetadata); - } - localIndexExporter.deleteExpiredTopNIndices(indexMetadataMap); - // Default retention is 7 days - // Oldest 3 of 10 indices should be deleted - verify(client, times(3)).admin(); - verify(adminClient, times(3)).indices(); - verify(indicesAdminClient, times(3)).delete(any(), any()); - } + // public void testDeleteExpiredTopNIndices() { + // // Reset exporter index pattern to default + // localIndexExporter.setIndexPattern(DateTimeFormatter.ofPattern(DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, Locale.ROOT)); + // + // // Create 9 top_queries-* indices + // Map indexMetadataMap = new HashMap<>(); + // for (int i = 1; i < 10; i++) { + // String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); + // long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); + // + // IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + // .settings( + // Settings.builder() + // .put("index.version.created", Version.CURRENT.id) + // .put("index.number_of_shards", 1) + // .put("index.number_of_replicas", 1) + // .put(SETTING_CREATION_DATE, creationTime) + // ) + // .build(); + // indexMetadataMap.put(indexName, indexMetadata); + // } + // localIndexExporter.deleteExpiredTopNIndices(indexMetadataMap); + // // Default retention is 7 days + // // Oldest 3 of 10 indices should be deleted + // verify(client, times(3)).admin(); + // verify(adminClient, times(3)).indices(); + // verify(indicesAdminClient, times(3)).delete(any(), any()); + // } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java index c1b40030..4afb3c7e 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java @@ -16,10 +16,15 @@ import java.util.Locale; import org.junit.Before; import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; import org.opensearch.telemetry.metrics.Counter; import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; /** * Granular tests for the {@link QueryInsightsExporterFactoryTests} class. @@ -30,10 +35,16 @@ public class QueryInsightsExporterFactoryTests extends OpenSearchTestCase { private final Client client = mock(Client.class); private QueryInsightsExporterFactory queryInsightsExporterFactory; private MetricsRegistry metricsRegistry; + private ClusterService clusterService; + private final ThreadPool threadPool = mock(ThreadPool.class); @Before public void setup() { - queryInsightsExporterFactory = new QueryInsightsExporterFactory(client); + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool); + queryInsightsExporterFactory = new QueryInsightsExporterFactory(client, clusterService); metricsRegistry = mock(MetricsRegistry.class); when(metricsRegistry.createCounter(any(String.class), any(String.class), any(String.class))).thenAnswer( invocation -> mock(Counter.class) @@ -55,11 +66,11 @@ public void testInvalidExporterTypeConfig() { } public void testCreateAndCloseExporter() { - QueryInsightsExporter exporter1 = queryInsightsExporterFactory.createExporter("id", SinkType.LOCAL_INDEX, format); + QueryInsightsExporter exporter1 = queryInsightsExporterFactory.createExporter("id", SinkType.LOCAL_INDEX, format, ""); assertTrue(exporter1 instanceof LocalIndexExporter); - QueryInsightsExporter exporter2 = queryInsightsExporterFactory.createExporter("id", SinkType.DEBUG, format); + QueryInsightsExporter exporter2 = queryInsightsExporterFactory.createExporter("id", SinkType.DEBUG, format, ""); assertTrue(exporter2 instanceof DebugExporter); - QueryInsightsExporter exporter3 = queryInsightsExporterFactory.createExporter("id", SinkType.DEBUG, format); + QueryInsightsExporter exporter3 = queryInsightsExporterFactory.createExporter("id", SinkType.DEBUG, format, ""); assertTrue(exporter3 instanceof DebugExporter); try { queryInsightsExporterFactory.closeExporter(exporter1); @@ -71,7 +82,13 @@ public void testCreateAndCloseExporter() { } public void testUpdateExporter() { - LocalIndexExporter exporter = new LocalIndexExporter(client, DateTimeFormatter.ofPattern(format, Locale.ROOT), "id"); + LocalIndexExporter exporter = new LocalIndexExporter( + client, + clusterService, + DateTimeFormatter.ofPattern(format, Locale.ROOT), + "", + "id" + ); queryInsightsExporterFactory.updateExporter(exporter, "yyyy-MM-dd-HH"); assertEquals(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH", Locale.ROOT).toString(), exporter.getIndexPattern().toString()); } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index ba59bec2..b5e2feaf 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -15,17 +15,21 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; -import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash; +import static org.opensearch.plugin.insights.core.service.QueryInsightsService.QUERY_INSIGHTS_INDEX_TAG_NAME; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_INDEX_TAG_VALUE; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.ENTRY_COUNT; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.EVICTIONS; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HIT_COUNT; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.MISS_COUNT; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.SIZE_IN_BYTES; +import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash; import java.time.Instant; +import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Before; @@ -34,12 +38,15 @@ import org.opensearch.client.Client; import org.opensearch.client.IndicesAdminClient; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator; import org.opensearch.plugin.insights.rules.model.GroupingType; @@ -60,6 +67,7 @@ * Unit Tests for {@link QueryInsightsService}. */ public class QueryInsightsServiceTests extends OpenSearchTestCase { + private final DateTimeFormatter format = DateTimeFormatter.ofPattern("YYYY.MM.dd", Locale.ROOT); private ThreadPool threadPool; private final Client client = mock(Client.class); private final NamedXContentRegistry namedXContentRegistry = mock(NamedXContentRegistry.class); @@ -67,9 +75,12 @@ public class QueryInsightsServiceTests extends OpenSearchTestCase { private QueryInsightsService queryInsightsServiceSpy; private final AdminClient adminClient = mock(AdminClient.class); private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + private ClusterService clusterService; + private LocalIndexExporter localIndexExporter; @Before public void setup() { + localIndexExporter = mock(LocalIndexExporter.class); Settings.Builder settingsBuilder = Settings.builder(); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -78,7 +89,7 @@ public void setup() { "QueryInsightsHealthStatsTests", new ScalingExecutorBuilder(QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, 1, 5, TimeValue.timeValueMinutes(5)) ); - ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); + clusterService = new ClusterService(settings, clusterSettings, threadPool); queryInsightsService = new QueryInsightsService( clusterService, threadPool, @@ -105,6 +116,7 @@ public void setup() { @Override public void tearDown() throws Exception { super.tearDown(); + IOUtils.close(clusterService); ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } @@ -256,6 +268,9 @@ public void testDeleteAllTopNIndices() { .put("index.number_of_replicas", 1) .put(SETTING_CREATION_DATE, creationTime) ) + .putMapping( + new MappingMetadata("_doc", Map.of("_meta", Map.of(QUERY_INSIGHTS_INDEX_TAG_NAME, TOP_QUERIES_INDEX_TAG_VALUE))) + ) .build(); indexMetadataMap.put(indexName, indexMetadata); } @@ -272,14 +287,80 @@ public void testDeleteAllTopNIndices() { .put("index.number_of_replicas", 1) .put(SETTING_CREATION_DATE, creationTime) ) + .putMapping( + new MappingMetadata("_doc", Map.of("_meta", Map.of(QUERY_INSIGHTS_INDEX_TAG_NAME, TOP_QUERIES_INDEX_TAG_VALUE))) + ) .build(); indexMetadataMap.put(indexName, indexMetadata); } - queryInsightsService.deleteAllTopNIndices(client, indexMetadataMap); + queryInsightsService.deleteAllTopNIndices(client, indexMetadataMap, localIndexExporter); // All 10 indices should be deleted - verify(client, times(9)).admin(); - verify(adminClient, times(9)).indices(); - verify(indicesAdminClient, times(9)).delete(any(), any()); + verify(localIndexExporter, times(9)).deleteSingleIndex(any(), any()); } + + // TODO: comment out for now. Need to reenable this before mering + // public void testDeleteExpiredTopNIndices() { + // // create a new cluster state with expired index mapping + // Settings.Builder settingsBuilder = Settings.builder(); + // Settings settings = settingsBuilder.build(); + // ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + // QueryInsightsTestUtils.registerAllQueryInsightsSettings(clusterSettings); + // ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("", true, 1 + randomInt(3), randomInt(2)); + // ClusterService newClusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), + // clusterSettings); + // + // RoutingTable.Builder routingTable = RoutingTable.builder(state.routingTable()); + // + // // Create 9 top_queries-* indices + // Map indexMetadataMap = new HashMap<>(); + // for (int i = 1; i < 10; i++) { + // String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); + // long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); + // + // IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + // .settings( + // Settings.builder() + // .put("index.version.created", Version.CURRENT.id) + // .put("index.number_of_shards", 1) + // .put("index.number_of_replicas", 1) + // .put(SETTING_CREATION_DATE, creationTime) + // ) + // .build(); + // indexMetadataMap.put(indexName, indexMetadata); + // routingTable.addAsRecovery(IndexMetadata.builder(indexName) + // .settings( + // Settings.builder() + // .put("index.version.created", Version.CURRENT.id) + // .put("index.number_of_shards", 1) + // .put("index.number_of_replicas", 1) + // ) + // .putMapping(new MappingMetadata("_doc", Map.of( + // "_meta", Map.of(QUERY_INSIGHTS_INDEX_TAG_NAME, TOP_QUERIES_INDEX_TAG_VALUE) + // ))) + // .build()); + // } + // ClusterState updatedState = ClusterState.builder(state).routingTable(routingTable.build()).build(); + // ClusterServiceUtils.setState(newClusterService, updatedState); + // QueryInsightsService newQueryInsightsService = new QueryInsightsService( + // newClusterService, + // threadPool, + // client, + // NoopMetricsRegistry.INSTANCE, + // namedXContentRegistry + // ); + // newQueryInsightsService.enableCollection(MetricType.LATENCY, true); + // newQueryInsightsService.enableCollection(MetricType.CPU, true); + // newQueryInsightsService.enableCollection(MetricType.MEMORY, true); + // newQueryInsightsService.setQueryShapeGenerator(new QueryShapeGenerator(clusterService)); + // newQueryInsightsService.queryInsightsExporterFactory.createExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID, SinkType.LOCAL_INDEX, + // "YYYY.MM.dd", ""); + // + // newQueryInsightsService.deleteExpiredTopNIndices(); + // // Default retention is 7 days + // // Oldest 3 of 10 indices should be deleted + // verify(client, times(3)).admin(); + // verify(adminClient, times(3)).indices(); + // verify(indicesAdminClient, times(3)).delete(any(), any()); + // } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index 27d2abc9..0971fc25 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -11,17 +11,26 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.plugin.insights.core.service.QueryInsightsService.QUERY_INSIGHTS_INDEX_TAG_NAME; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_INDEX_TAG_VALUE; import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex; import static org.opensearch.plugin.insights.core.service.TopQueriesService.validateExporterDeleteAfter; +import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.junit.Before; +import org.opensearch.Version; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.client.IndicesAdminClient; import org.opensearch.cluster.coordination.DeterministicTaskQueue; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; @@ -211,17 +220,147 @@ public void testValidateExporterDeleteAfter() { ); } - public void testIsTopQueriesIndex() { - assertTrue(isTopQueriesIndex("top_queries-2024.01.01-01234")); - assertTrue(isTopQueriesIndex("top_queries-2025.12.12-99999")); + private IndexMetadata createValidIndexMetadata(String indexName) { + // valid index metadata + long creationTime = Instant.now().toEpochMilli(); + return IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .putMapping(new MappingMetadata("_doc", Map.of("_meta", Map.of(QUERY_INSIGHTS_INDEX_TAG_NAME, TOP_QUERIES_INDEX_TAG_VALUE)))) + .build(); + } + + public void testIsTopQueriesIndexWithValidMetaData() { + assertTrue(isTopQueriesIndex("top_queries-2024.01.01-01234", createValidIndexMetadata("top_queries-2024.01.01-01234"))); + assertTrue(isTopQueriesIndex("top_queries-2025.12.12-99999", createValidIndexMetadata("top_queries-2025.12.12-99999"))); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-012345", createValidIndexMetadata("top_queries-2024.01.01-012345"))); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-0123w", createValidIndexMetadata("top_queries-2024.01.01-0123w"))); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01", createValidIndexMetadata("top_queries-2024.01.01"))); + assertFalse(isTopQueriesIndex("top_queries-2024.01.32-01234", createValidIndexMetadata("top_queries-2024.01.32-01234"))); + assertFalse(isTopQueriesIndex("top_queries-01234", createValidIndexMetadata("top_queries-01234"))); + assertFalse(isTopQueriesIndex("top_querie-2024.01.01-01234", createValidIndexMetadata("top_querie-2024.01.01-01234"))); + assertFalse(isTopQueriesIndex("2024.01.01-01234", createValidIndexMetadata("2024.01.01-01234"))); + assertFalse(isTopQueriesIndex("any_index", createValidIndexMetadata("any_index"))); + assertFalse(isTopQueriesIndex("", createValidIndexMetadata(""))); + assertFalse(isTopQueriesIndex("_customer_index", createValidIndexMetadata("_customer_index"))); + } + + private IndexMetadata createIndexMetadataWithEmptyMapping(String indexName) { + long creationTime = Instant.now().toEpochMilli(); + return IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .build(); + } + + public void testIsTopQueriesIndexWithEmptyMetaData() { + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-01234", createIndexMetadataWithEmptyMapping("top_queries-2024.01.01-01234"))); + assertFalse(isTopQueriesIndex("top_queries-2025.12.12-99999", createIndexMetadataWithEmptyMapping("top_queries-2025.12.12-99999"))); + assertFalse( + isTopQueriesIndex("top_queries-2024.01.01-012345", createIndexMetadataWithEmptyMapping("top_queries-2024.01.01-012345")) + ); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-0123w", createIndexMetadataWithEmptyMapping("top_queries-2024.01.01-0123w"))); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01", createIndexMetadataWithEmptyMapping("top_queries-2024.01.01"))); + assertFalse(isTopQueriesIndex("top_queries-2024.01.32-01234", createIndexMetadataWithEmptyMapping("top_queries-2024.01.32-01234"))); + assertFalse(isTopQueriesIndex("top_queries-01234", createIndexMetadataWithEmptyMapping("top_queries-01234"))); + assertFalse(isTopQueriesIndex("top_querie-2024.01.01-01234", createIndexMetadataWithEmptyMapping("top_querie-2024.01.01-01234"))); + assertFalse(isTopQueriesIndex("2024.01.01-01234", createIndexMetadataWithEmptyMapping("2024.01.01-01234"))); + assertFalse(isTopQueriesIndex("any_index", createIndexMetadataWithEmptyMapping("any_index"))); + assertFalse(isTopQueriesIndex("", createIndexMetadataWithEmptyMapping(""))); + assertFalse(isTopQueriesIndex("_customer_index", createIndexMetadataWithEmptyMapping("_customer_index"))); + } + + private IndexMetadata createIndexMetadataWithDifferentValue(String indexName) { + long creationTime = Instant.now().toEpochMilli(); + return IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .putMapping(new MappingMetadata("_doc", Map.of("_meta", Map.of(QUERY_INSIGHTS_INDEX_TAG_NAME, "someOtherTag")))) + .build(); + } + + public void testIsTopQueriesIndexWithDifferentMetaData() { + assertFalse( + isTopQueriesIndex("top_queries-2024.01.01-01234", createIndexMetadataWithDifferentValue("top_queries-2024.01.01-01234")) + ); + assertFalse( + isTopQueriesIndex("top_queries-2025.12.12-99999", createIndexMetadataWithDifferentValue("top_queries-2025.12.12-99999")) + ); + assertFalse( + isTopQueriesIndex("top_queries-2024.01.01-012345", createIndexMetadataWithDifferentValue("top_queries-2024.01.01-012345")) + ); + assertFalse( + isTopQueriesIndex("top_queries-2024.01.01-0123w", createIndexMetadataWithDifferentValue("top_queries-2024.01.01-0123w")) + ); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01", createIndexMetadataWithDifferentValue("top_queries-2024.01.01"))); + assertFalse( + isTopQueriesIndex("top_queries-2024.01.32-01234", createIndexMetadataWithDifferentValue("top_queries-2024.01.32-01234")) + ); + assertFalse(isTopQueriesIndex("top_queries-01234", createIndexMetadataWithDifferentValue("top_queries-01234"))); + assertFalse(isTopQueriesIndex("top_querie-2024.01.01-01234", createIndexMetadataWithDifferentValue("top_querie-2024.01.01-01234"))); + assertFalse(isTopQueriesIndex("2024.01.01-01234", createIndexMetadataWithDifferentValue("2024.01.01-01234"))); + assertFalse(isTopQueriesIndex("any_index", createIndexMetadataWithDifferentValue("any_index"))); + assertFalse(isTopQueriesIndex("", createIndexMetadataWithDifferentValue(""))); + assertFalse(isTopQueriesIndex("_customer_index", createIndexMetadataWithDifferentValue("_customer_index"))); + } + + private IndexMetadata createIndexMetadataWithExtraValue(String indexName) { + long creationTime = Instant.now().toEpochMilli(); + return IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .putMapping(new MappingMetadata("_doc", Map.of("_meta", Map.of("test", "someOtherTag")))) + .build(); + } + + public void testIsTopQueriesIndexWithExtraMetaData() { + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-01234", createIndexMetadataWithExtraValue("top_queries-2024.01.01-01234"))); + assertFalse(isTopQueriesIndex("top_queries-2025.12.12-99999", createIndexMetadataWithExtraValue("top_queries-2025.12.12-99999"))); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-012345", createIndexMetadataWithExtraValue("top_queries-2024.01.01-012345"))); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-0123w", createIndexMetadataWithExtraValue("top_queries-2024.01.01-0123w"))); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01", createIndexMetadataWithExtraValue("top_queries-2024.01.01"))); + assertFalse(isTopQueriesIndex("top_queries-2024.01.32-01234", createIndexMetadataWithExtraValue("top_queries-2024.01.32-01234"))); + assertFalse(isTopQueriesIndex("top_queries-01234", createIndexMetadataWithExtraValue("top_queries-01234"))); + assertFalse(isTopQueriesIndex("top_querie-2024.01.01-01234", createIndexMetadataWithExtraValue("top_querie-2024.01.01-01234"))); + assertFalse(isTopQueriesIndex("2024.01.01-01234", createIndexMetadataWithExtraValue("2024.01.01-01234"))); + assertFalse(isTopQueriesIndex("any_index", createIndexMetadataWithExtraValue("any_index"))); + assertFalse(isTopQueriesIndex("", createIndexMetadataWithExtraValue(""))); + assertFalse(isTopQueriesIndex("_customer_index", createIndexMetadataWithExtraValue("_customer_index"))); + } - assertFalse(isTopQueriesIndex("top_queries-2024.01.01-012345")); - assertFalse(isTopQueriesIndex("top_queries-2024.01.01-0123w")); - assertFalse(isTopQueriesIndex("top_queries-2024.01.01")); - assertFalse(isTopQueriesIndex("top_queries-2024.01.32-01234")); - assertFalse(isTopQueriesIndex("top_queries-01234")); - assertFalse(isTopQueriesIndex("top_querie-2024.01.01-01234")); - assertFalse(isTopQueriesIndex("2024.01.01-01234")); + public void testIsTopQueriesIndexWithNullMetaData() { + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-01234", null)); + assertFalse(isTopQueriesIndex("top_queries-2025.12.12-99999", null)); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-012345", null)); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01-0123w", null)); + assertFalse(isTopQueriesIndex("top_queries-2024.01.01", null)); + assertFalse(isTopQueriesIndex("top_queries-2024.01.32-01234", null)); + assertFalse(isTopQueriesIndex("top_queries-01234", null)); + assertFalse(isTopQueriesIndex("top_querie-2024.01.01-01234", null)); + assertFalse(isTopQueriesIndex("2024.01.01-01234", null)); + assertFalse(isTopQueriesIndex("any_index", null)); + assertFalse(isTopQueriesIndex("", null)); + assertFalse(isTopQueriesIndex("_customer_index", null)); } public void testTopQueriesForId() { From 9584b389439eb86e280c0a55c0338a90e0888960 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Sat, 25 Jan 2025 16:53:40 -0800 Subject: [PATCH 5/7] fix bug on local index reader with generateLocalIndexDateHash Signed-off-by: Chenyang Ji --- .../insights/core/exporter/LocalIndexExporter.java | 3 ++- .../insights/core/reader/LocalIndexReader.java | 2 +- .../insights/core/utils/ExporterReaderUtils.java | 14 ++++++-------- .../core/exporter/LocalIndexExporterTests.java | 4 +++- .../core/service/QueryInsightsServiceTests.java | 7 ++++++- 5 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java index 73daf30b..6eee8eb9 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java @@ -207,7 +207,8 @@ public void close() { * @return A string representing the index name in the format "top_queries-YYYY.MM.dd-01234". */ String buildLocalIndexName() { - return indexPattern.format(ZonedDateTime.now(ZoneOffset.UTC)) + "-" + generateLocalIndexDateHash(); + ZonedDateTime currentTime = ZonedDateTime.now(ZoneOffset.UTC); + return indexPattern.format(currentTime) + "-" + generateLocalIndexDateHash(currentTime.toLocalDate()); } /** diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java index f60f7d24..c0bbead1 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java @@ -154,6 +154,6 @@ public void close() { } private String buildLocalIndexName(ZonedDateTime current) { - return current.format(indexPattern) + "-" + generateLocalIndexDateHash(); + return current.format(indexPattern) + "-" + generateLocalIndexDateHash(current.toLocalDate()); } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/utils/ExporterReaderUtils.java b/src/main/java/org/opensearch/plugin/insights/core/utils/ExporterReaderUtils.java index 2b75042d..b586e302 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/utils/ExporterReaderUtils.java +++ b/src/main/java/org/opensearch/plugin/insights/core/utils/ExporterReaderUtils.java @@ -8,8 +8,7 @@ package org.opensearch.plugin.insights.core.utils; -import java.time.Instant; -import java.time.ZoneOffset; +import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.Locale; @@ -22,17 +21,16 @@ public class ExporterReaderUtils { private ExporterReaderUtils() {} /** - * Generates a consistent 5-digit numeric hash based on the current UTC date. + * Generates a consistent 5-digit numeric hash based on the given UTC date. * The generated hash is deterministic, meaning it will return the same result for the same date. * * @return A 5-digit numeric string representation of the current date's hash. */ - public static String generateLocalIndexDateHash() { - // Get the current date in UTC (yyyy-MM-dd format) - String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT) - .format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate()); + public static String generateLocalIndexDateHash(LocalDate date) { + // Get the date string in UTC (yyyy-MM-dd format) + String dateString = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT).format(date); // Generate a 5-digit numeric hash from the date's hashCode - return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000); + return String.format(Locale.ROOT, "%05d", (dateString.hashCode() % 100000 + 100000) % 100000); } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java index 2a66bb08..07d02247 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java @@ -67,7 +67,9 @@ public class LocalIndexExporterTests extends OpenSearchTestCase { @Before public void setup() { - indexName = format.format(ZonedDateTime.now(ZoneOffset.UTC)) + "-" + generateLocalIndexDateHash(); + indexName = format.format(ZonedDateTime.now(ZoneOffset.UTC)) + + "-" + + generateLocalIndexDateHash(ZonedDateTime.now(ZoneOffset.UTC).toLocalDate()); Settings.Builder settingsBuilder = Settings.builder(); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index b5e2feaf..b78d01bb 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -25,6 +25,8 @@ import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash; import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.HashMap; @@ -257,7 +259,10 @@ public void testDeleteAllTopNIndices() { // Create 9 top_queries-* indices Map indexMetadataMap = new HashMap<>(); for (int i = 1; i < 10; i++) { - String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); + String indexName = "top_queries-2024.01.0" + + i + + "-" + + generateLocalIndexDateHash(ZonedDateTime.of(2024, 1, i, 0, 0, 0, 0, ZoneId.of("UTC")).toLocalDate()); long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); IndexMetadata indexMetadata = IndexMetadata.builder(indexName) From 28330042b84406cb3702790d8136a69e0629fb4f Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Sat, 25 Jan 2025 17:47:15 -0800 Subject: [PATCH 6/7] fix exception in query grouper Signed-off-by: Chenyang Ji --- .../grouper/MinMaxHeapQueryGrouper.java | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java index 33bbecb6..8cb79af3 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java @@ -109,7 +109,11 @@ public SearchQueryRecord add(final SearchQueryRecord searchQueryRecord) { throw new IllegalArgumentException("Do not use addQueryToGroup when GroupingType is None"); } SearchQueryRecord aggregateSearchQueryRecord; + String groupId = getGroupingId(searchQueryRecord); + if (groupId == null) { + return null; + } // 1) New group added to the grouping service // Add to min PQ and overflow records to max PQ (if the number of records in the min PQ exceeds the configured size N) @@ -269,13 +273,18 @@ int numberOfTopGroups() { * @return Grouping Id */ private String getGroupingId(final SearchQueryRecord searchQueryRecord) { - switch (groupingType) { - case SIMILARITY: - return searchQueryRecord.getAttributes().get(Attribute.QUERY_GROUP_HASHCODE).toString(); - case NONE: - throw new IllegalArgumentException("Should not try to group queries if grouping type is NONE"); - default: - throw new IllegalArgumentException("The following grouping type is not supported : " + groupingType); + try { + switch (groupingType) { + case SIMILARITY: + return searchQueryRecord.getAttributes().get(Attribute.QUERY_GROUP_HASHCODE).toString(); + case NONE: + throw new IllegalArgumentException("Should not try to group queries if grouping type is NONE"); + default: + throw new IllegalArgumentException("The following grouping type is not supported : " + groupingType); + } + } catch (Exception e) { + log.error("Error when setting group id", e); + return null; } } From 076166817d4626a768537110c18af985b5528a50 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Sat, 25 Jan 2025 18:25:19 -0800 Subject: [PATCH 7/7] update PR based on comments and re-enable unit tests Signed-off-by: Chenyang Ji --- .../core/reader/LocalIndexReader.java | 3 +- .../exporter/LocalIndexExporterTests.java | 29 --- .../service/QueryInsightsServiceTests.java | 169 +++++++++++------- 3 files changed, 105 insertions(+), 96 deletions(-) diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java index c0bbead1..927a0392 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java @@ -39,6 +39,7 @@ * Local index reader for reading query insights data from local OpenSearch indices. */ public final class LocalIndexReader implements QueryInsightsReader { + private final static int MAX_TOP_N_INDEX_READ_SIZE = 1000; /** * Logger of the local index reader */ @@ -116,7 +117,7 @@ public List read(final String from, final String to, String i while (curr.isBefore(end.plusDays(1).toLocalDate().atStartOfDay(end.getZone()))) { String indexName = buildLocalIndexName(curr); SearchRequest searchRequest = new SearchRequest(indexName); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(1000); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(MAX_TOP_N_INDEX_READ_SIZE); MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*"); RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp") .from(start.toInstant().toEpochMilli()) diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java index 07d02247..a605634d 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java @@ -187,33 +187,4 @@ public void testGetAndSetIndexPattern() { localIndexExporter.setIndexPattern(newFormatter); assert (localIndexExporter.getIndexPattern() == newFormatter); } - - // public void testDeleteExpiredTopNIndices() { - // // Reset exporter index pattern to default - // localIndexExporter.setIndexPattern(DateTimeFormatter.ofPattern(DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, Locale.ROOT)); - // - // // Create 9 top_queries-* indices - // Map indexMetadataMap = new HashMap<>(); - // for (int i = 1; i < 10; i++) { - // String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); - // long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); - // - // IndexMetadata indexMetadata = IndexMetadata.builder(indexName) - // .settings( - // Settings.builder() - // .put("index.version.created", Version.CURRENT.id) - // .put("index.number_of_shards", 1) - // .put("index.number_of_replicas", 1) - // .put(SETTING_CREATION_DATE, creationTime) - // ) - // .build(); - // indexMetadataMap.put(indexName, indexMetadata); - // } - // localIndexExporter.deleteExpiredTopNIndices(indexMetadataMap); - // // Default retention is 7 days - // // Oldest 3 of 10 indices should be deleted - // verify(client, times(3)).admin(); - // verify(adminClient, times(3)).indices(); - // verify(indicesAdminClient, times(3)).delete(any(), any()); - // } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index b78d01bb..d4b03e1d 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -9,6 +9,7 @@ package org.opensearch.plugin.insights.core.service; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -17,6 +18,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; import static org.opensearch.plugin.insights.core.service.QueryInsightsService.QUERY_INSIGHTS_INDEX_TAG_NAME; import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_INDEX_TAG_VALUE; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.ENTRY_COUNT; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.EVICTIONS; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HIT_COUNT; @@ -26,6 +28,7 @@ import java.time.Instant; import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; @@ -33,14 +36,19 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.opensearch.Version; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -49,6 +57,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter; +import org.opensearch.plugin.insights.core.exporter.SinkType; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator; import org.opensearch.plugin.insights.rules.model.GroupingType; @@ -60,6 +69,7 @@ import org.opensearch.telemetry.metrics.Counter; import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; @@ -74,10 +84,12 @@ public class QueryInsightsServiceTests extends OpenSearchTestCase { private final Client client = mock(Client.class); private final NamedXContentRegistry namedXContentRegistry = mock(NamedXContentRegistry.class); private QueryInsightsService queryInsightsService; + private QueryInsightsService newQueryInsightsService; private QueryInsightsService queryInsightsServiceSpy; private final AdminClient adminClient = mock(AdminClient.class); private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); private ClusterService clusterService; + private ClusterService newClusterService; private LocalIndexExporter localIndexExporter; @Before @@ -118,8 +130,23 @@ public void setup() { @Override public void tearDown() throws Exception { super.tearDown(); - IOUtils.close(clusterService); - ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + if (clusterService != null) { + IOUtils.close(clusterService); + } + if (newClusterService != null) { + IOUtils.close(newClusterService); + } + if (queryInsightsService != null) { + queryInsightsService.doClose(); + } + if (newClusterService != null) { + IOUtils.close(newClusterService); + } + if (newQueryInsightsService != null) { + newQueryInsightsService.doClose(); + } + + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } public void testAddRecordToLimitAndDrain() { @@ -304,68 +331,78 @@ public void testDeleteAllTopNIndices() { verify(localIndexExporter, times(9)).deleteSingleIndex(any(), any()); } - // TODO: comment out for now. Need to reenable this before mering - // public void testDeleteExpiredTopNIndices() { - // // create a new cluster state with expired index mapping - // Settings.Builder settingsBuilder = Settings.builder(); - // Settings settings = settingsBuilder.build(); - // ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - // QueryInsightsTestUtils.registerAllQueryInsightsSettings(clusterSettings); - // ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("", true, 1 + randomInt(3), randomInt(2)); - // ClusterService newClusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), - // clusterSettings); - // - // RoutingTable.Builder routingTable = RoutingTable.builder(state.routingTable()); - // - // // Create 9 top_queries-* indices - // Map indexMetadataMap = new HashMap<>(); - // for (int i = 1; i < 10; i++) { - // String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); - // long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); - // - // IndexMetadata indexMetadata = IndexMetadata.builder(indexName) - // .settings( - // Settings.builder() - // .put("index.version.created", Version.CURRENT.id) - // .put("index.number_of_shards", 1) - // .put("index.number_of_replicas", 1) - // .put(SETTING_CREATION_DATE, creationTime) - // ) - // .build(); - // indexMetadataMap.put(indexName, indexMetadata); - // routingTable.addAsRecovery(IndexMetadata.builder(indexName) - // .settings( - // Settings.builder() - // .put("index.version.created", Version.CURRENT.id) - // .put("index.number_of_shards", 1) - // .put("index.number_of_replicas", 1) - // ) - // .putMapping(new MappingMetadata("_doc", Map.of( - // "_meta", Map.of(QUERY_INSIGHTS_INDEX_TAG_NAME, TOP_QUERIES_INDEX_TAG_VALUE) - // ))) - // .build()); - // } - // ClusterState updatedState = ClusterState.builder(state).routingTable(routingTable.build()).build(); - // ClusterServiceUtils.setState(newClusterService, updatedState); - // QueryInsightsService newQueryInsightsService = new QueryInsightsService( - // newClusterService, - // threadPool, - // client, - // NoopMetricsRegistry.INSTANCE, - // namedXContentRegistry - // ); - // newQueryInsightsService.enableCollection(MetricType.LATENCY, true); - // newQueryInsightsService.enableCollection(MetricType.CPU, true); - // newQueryInsightsService.enableCollection(MetricType.MEMORY, true); - // newQueryInsightsService.setQueryShapeGenerator(new QueryShapeGenerator(clusterService)); - // newQueryInsightsService.queryInsightsExporterFactory.createExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID, SinkType.LOCAL_INDEX, - // "YYYY.MM.dd", ""); - // - // newQueryInsightsService.deleteExpiredTopNIndices(); - // // Default retention is 7 days - // // Oldest 3 of 10 indices should be deleted - // verify(client, times(3)).admin(); - // verify(adminClient, times(3)).indices(); - // verify(indicesAdminClient, times(3)).delete(any(), any()); - // } + public void testDeleteExpiredTopNIndices() throws InterruptedException { + // Create a new cluster state with expired index mappings + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + QueryInsightsTestUtils.registerAllQueryInsightsSettings(clusterSettings); + // Create a mock cluster state with expired indices + ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("", true, 1 + randomInt(3), randomInt(2)); + RoutingTable.Builder routingTable = RoutingTable.builder(state.routingTable()); + // Create 9 top_queries-* indices with creation dates older than the retention period + Map indexMetadataMap = new HashMap<>(); + for (int i = 1; i < 10; i++) { + String indexName = "top_queries-2023.01.0" + + i + + "-" + + generateLocalIndexDateHash(ZonedDateTime.now(ZoneOffset.UTC).toLocalDate()); + long creationTime = Instant.now().minus(i + 100, ChronoUnit.DAYS).toEpochMilli(); // Ensure indices are expired + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .putMapping( + new MappingMetadata("_doc", Map.of("_meta", Map.of(QUERY_INSIGHTS_INDEX_TAG_NAME, TOP_QUERIES_INDEX_TAG_VALUE))) + ) + .build(); + indexMetadataMap.put(indexName, indexMetadata); + routingTable.addAsRecovery(indexMetadata); + } + // Update the cluster state with the new indices + ClusterState updatedState = ClusterState.builder(state) + .metadata(Metadata.builder(state.metadata()).indices(indexMetadataMap).build()) + .routingTable(routingTable.build()) + .build(); + // Create a new cluster service with the updated state + newClusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings); + ClusterServiceUtils.setState(newClusterService, updatedState); + // Initialize the QueryInsightsService with the new cluster service + newQueryInsightsService = new QueryInsightsService( + newClusterService, + threadPool, + client, + NoopMetricsRegistry.INSTANCE, + namedXContentRegistry + ); + newQueryInsightsService.enableCollection(MetricType.LATENCY, true); + newQueryInsightsService.enableCollection(MetricType.CPU, true); + newQueryInsightsService.enableCollection(MetricType.MEMORY, true); + newQueryInsightsService.setQueryShapeGenerator(new QueryShapeGenerator(newClusterService)); + // Create a local index exporter with a retention period of 7 days + newQueryInsightsService.queryInsightsExporterFactory.createExporter( + TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID, + SinkType.LOCAL_INDEX, + "YYYY.MM.dd", + "" + ); + CountDownLatch latch = new CountDownLatch(9); + doAnswer(invocation -> { + latch.countDown(); + return null; + }).when(indicesAdminClient).delete(any(), any()); + // Call the method under test + newQueryInsightsService.deleteExpiredTopNIndices(); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + // Verify that the correct number of indices are deleted + // Default retention is 7 days, so all 9 indices should be deleted + verify(client, times(9)).admin(); + verify(adminClient, times(9)).indices(); + verify(indicesAdminClient, times(9)).delete(any(), any()); + } }