From fdca61951ae415e3a8724c446278a14050d9f15a Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Fri, 31 Jan 2025 11:27:32 -0800 Subject: [PATCH] Fix default exporter settings (#234) Signed-off-by: Chenyang Ji --- .../plugin/insights/QueryInsightsPlugin.java | 6 +- .../insights/core/exporter/DebugExporter.java | 9 +- .../core/exporter/LocalIndexExporter.java | 10 + .../QueryInsightsExporterFactory.java | 45 ++- .../core/metrics/OperationalMetric.java | 1 + .../reader/QueryInsightsReaderFactory.java | 4 +- .../core/service/QueryInsightsService.java | 150 +++++---- .../core/service/TopQueriesService.java | 39 +-- .../insights/QueryInsightsRestTestCase.java | 2 + .../QueryInsightsExporterFactoryTests.java | 6 +- .../OperationalMetricsCounterTests.java | 2 +- .../QueryInsightsReaderFactoryTests.java | 2 +- .../service/QueryInsightsServiceTests.java | 290 ++++++++++++++---- .../core/service/TopQueriesServiceTests.java | 14 - .../top_queries/TopQueriesRestIT.java | 2 +- 15 files changed, 397 insertions(+), 185 deletions(-) diff --git a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index d912ccf2..1438b29c 100644 --- a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -28,8 +28,10 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.core.listener.QueryInsightsListener; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; +import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.rules.action.health_stats.HealthStatsAction; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; @@ -86,7 +88,9 @@ public Collection createComponents( threadPool, client, metricsRegistry, - xContentRegistry + xContentRegistry, + new QueryInsightsExporterFactory(client, clusterService), + new QueryInsightsReaderFactory(client) ); return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService, false)); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java index 2ad34bbc..0a58d4b4 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java @@ -21,7 +21,7 @@ public class DebugExporter implements QueryInsightsExporter { * Logger of the debug exporter */ private final Logger logger = LogManager.getLogger(); - private static final String EXPORTER_ID = "debug_exporter"; + private static final String DEBUG_EXPORTER_ID = "debug_exporter"; /** * Constructor of DebugExporter @@ -30,9 +30,14 @@ private DebugExporter() {} @Override public String getId() { - return EXPORTER_ID; + return DEBUG_EXPORTER_ID; } + /** + * Singleton holder class for the DebugExporter instance. + * A single DebugExporter instance is shared across all services, using the default + * debug exporter identifier EXPORTER_ID. + */ private static class InstanceHolder { private static final DebugExporter INSTANCE = new DebugExporter(); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java index 6eee8eb9..59d1621f 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java @@ -88,6 +88,16 @@ public LocalIndexExporter( this.id = id; } + /** + * Retrieves the identifier for the local index exporter. + * + * Each service can either have its own dedicated local index exporter or share + * an existing one. This identifier is used by the QueryInsightsExporterFactory + * to locate and manage the appropriate exporter instance. + * + * @return The identifier of the local index exporter + * @see QueryInsightsExporterFactory + */ @Override public String getId() { return id; diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java index e18f44a6..183d3f44 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java @@ -30,6 +30,9 @@ public class QueryInsightsExporterFactory { private final Logger logger = LogManager.getLogger(); final private Client client; final private ClusterService clusterService; + /** + * Maps exporter identifiers to their corresponding exporter sink instances. + */ final private Map exporters; /** @@ -66,27 +69,35 @@ public void validateExporterType(final String exporterType) throws IllegalArgume } /** - * Create an exporter based on provided parameters + * Create a local index exporter based on provided parameters * - * @param id id of the exporter - * @param type The type of exporter to create + * @param id id of the exporter so that exporters can be retrieved and reused across services * @param indexPattern the index pattern if creating an index exporter * @param indexMapping index mapping file - * @return QueryInsightsExporter the created exporter sink + * @return LocalIndexExporter the created exporter sink */ - public QueryInsightsExporter createExporter(String id, SinkType type, String indexPattern, String indexMapping) { - if (SinkType.LOCAL_INDEX.equals(type)) { - QueryInsightsExporter exporter = new LocalIndexExporter( - client, - clusterService, - DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), - indexMapping, - id - ); - this.exporters.put(id, exporter); - return exporter; - } - return DebugExporter.getInstance(); + public LocalIndexExporter createLocalIndexExporter(String id, String indexPattern, String indexMapping) { + LocalIndexExporter exporter = new LocalIndexExporter( + client, + clusterService, + DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), + indexMapping, + id + ); + this.exporters.put(id, exporter); + return exporter; + } + + /** + * Create a debug exporter based on provided parameters + * + * @param id id of the exporter so that exporters can be retrieved and reused across services + * @return DebugExporter the created exporter sink + */ + public DebugExporter createDebugExporter(String id) { + DebugExporter debugExporter = DebugExporter.getInstance(); + this.exporters.put(id, debugExporter); + return debugExporter; } /** diff --git a/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java b/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java index c74ec890..dcb7c366 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java +++ b/src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java @@ -19,6 +19,7 @@ public enum OperationalMetric { DATA_INGEST_EXCEPTIONS("Number of exceptions during data ingest in Query Insights"), QUERY_CATEGORIZE_EXCEPTIONS("Number of exceptions when categorizing the queries"), EXPORTER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the exporter"), + READER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the reader"), TOP_N_QUERIES_USAGE_COUNT("Number of times the top n queries API is used"); private final String description; diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java index aa7fff4d..cefeda2c 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java @@ -40,13 +40,13 @@ public QueryInsightsReaderFactory(final Client client) { } /** - * Create a Reader based on provided parameters + * Create a Local Index Reader based on provided parameters * * @param indexPattern the index pattern if creating an index Reader * @param namedXContentRegistry for parsing purposes * @return QueryInsightsReader the created Reader */ - public QueryInsightsReader createReader(String id, String indexPattern, NamedXContentRegistry namedXContentRegistry) { + public QueryInsightsReader createLocalIndexReader(String id, String indexPattern, NamedXContentRegistry namedXContentRegistry) { QueryInsightsReader reader = new LocalIndexReader( client, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index cde9df71..b68e36d7 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -8,11 +8,13 @@ package org.opensearch.plugin.insights.core.service; -import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID; -import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_READER_ID; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_EXPORTER_ID; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_READER_ID; import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TYPE; @@ -23,6 +25,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.concurrent.LinkedBlockingQueue; @@ -121,6 +124,7 @@ public class QueryInsightsService extends AbstractLifecycleComponent { private QueryShapeGenerator queryShapeGenerator; private final Client client; + SinkType sinkType; /** * Constructor of the QueryInsightsService @@ -137,14 +141,16 @@ public QueryInsightsService( final ThreadPool threadPool, final Client client, final MetricsRegistry metricsRegistry, - final NamedXContentRegistry namedXContentRegistry + final NamedXContentRegistry namedXContentRegistry, + final QueryInsightsExporterFactory queryInsightsExporterFactory, + final QueryInsightsReaderFactory queryInsightsReaderFactory ) { this.clusterService = clusterService; enableCollect = new HashMap<>(); queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); this.threadPool = threadPool; - this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client, clusterService); - this.queryInsightsReaderFactory = new QueryInsightsReaderFactory(client); + this.queryInsightsExporterFactory = queryInsightsExporterFactory; + this.queryInsightsReaderFactory = queryInsightsReaderFactory; this.namedXContentRegistry = namedXContentRegistry; this.client = client; // initialize top n queries services and configurations consumers @@ -153,22 +159,25 @@ public QueryInsightsService( enableCollect.put(metricType, false); topQueriesServices.put( metricType, - new TopQueriesService(client, metricType, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory) + new TopQueriesService(client, metricType, threadPool, this.queryInsightsExporterFactory, this.queryInsightsReaderFactory) ); } clusterService.getClusterSettings() .addSettingsUpdateConsumer( TOP_N_EXPORTER_TYPE, - (v -> setExporterAndReader(SinkType.parse(v), clusterService.state().metadata().indices())), + (v -> setExporterAndReaderType(SinkType.parse(v))), (this::validateExporterType) ); clusterService.getClusterSettings() .addSettingsUpdateConsumer( TOP_N_EXPORTER_DELETE_AFTER, (this::setExporterDeleteAfterAndDelete), - (TopQueriesService::validateExporterDeleteAfter) + (this::validateExporterDeleteAfter) ); + this.setExporterDeleteAfterAndDelete(clusterService.getClusterSettings().get(TOP_N_EXPORTER_DELETE_AFTER)); + this.setExporterAndReaderType(SinkType.parse(clusterService.getClusterSettings().get(TOP_N_EXPORTER_TYPE))); + this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry); this.enableSearchQueryMetricsFeature(false); this.groupingType = DEFAULT_GROUPING_TYPE; @@ -414,57 +423,58 @@ public void setTopNSize(final MetricType type, final int topNSize) { } /** - * Set the exporter and reader config for a metricType + * Set the exporter and reader type config for a metricType * * @param sinkType {@link SinkType} - * @param indexMetadataMap index metadata map in the current cluster */ - private void setExporterAndReader(final SinkType sinkType, final Map indexMetadataMap) { - final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID); - - // This method is invoked when sink type is changed - // Clear local indices if exporter is of type LocalIndexExporter - if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) { - deleteAllTopNIndices(client, indexMetadataMap, (LocalIndexExporter) topQueriesExporter); + public void setExporterAndReaderType(final SinkType sinkType) { + // Configure the exporter for TopQueriesService in QueryInsightsService + final QueryInsightsExporter currentExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_EXPORTER_ID); + final QueryInsightsReader currentReader = queryInsightsReaderFactory.getReader(TOP_QUERIES_READER_ID); + // Handles the cleanup when sink type is changed from LocalIndexExporter. + // Clears all local indices from storage when the exporter configuration + // is switched away from LocalIndexExporter type. + if (this.sinkType == SinkType.LOCAL_INDEX && currentExporter != null) { + deleteAllTopNIndices(client, (LocalIndexExporter) currentExporter); } - if (sinkType != null) { - if (topQueriesExporter != null && sinkType == SinkType.getSinkTypeFromExporter(topQueriesExporter)) { - // this won't happen since we disallowed users to change index patterns. - // But leaving the hook here since we will add support for more sinks and configurations in the future. - queryInsightsExporterFactory.updateExporter(topQueriesExporter, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN); - } else { - try { - queryInsightsExporterFactory.closeExporter(topQueriesExporter); - } catch (IOException e) { - OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION); - logger.error("Fail to close the current exporter when updating exporter, error: ", e); - } - // this is a new exporter, create it for all underlying services. - queryInsightsExporterFactory.createExporter( - TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID, - sinkType, - DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, - "mappings/top-queries-record.json" - ); - } - } else { - // Disable exporter if exporter type is set to null + // Close the current exporter and reader + if (currentExporter != null) { try { - queryInsightsExporterFactory.closeExporter(topQueriesExporter); + queryInsightsExporterFactory.closeExporter(currentExporter); } catch (IOException e) { OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION); - logger.error("Fail to close the current exporter when disabling exporter, error: ", e); + logger.error("Fail to close the current exporter when updating exporter and reader, error: ", e); + } + } + if (currentReader != null) { + try { + queryInsightsReaderFactory.closeReader(currentReader); + } catch (IOException e) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.READER_FAIL_TO_CLOSE_EXCEPTION); + logger.error("Fail to close the current reader when updating exporter and reader, error: ", e); } } + // Set sink type to local index for TopQueriesServices + if (sinkType == SinkType.LOCAL_INDEX) { + queryInsightsExporterFactory.createLocalIndexExporter( + TOP_QUERIES_EXPORTER_ID, + DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, + "mappings/top-queries-record.json" + ); + // Set up reader for TopQueriesService + queryInsightsReaderFactory.createLocalIndexReader( + TOP_QUERIES_READER_ID, + DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, + namedXContentRegistry + ); + } + // Set sink type to debug exporter + else if (sinkType == SinkType.DEBUG) { + queryInsightsExporterFactory.createDebugExporter(TOP_QUERIES_EXPORTER_ID); + } - // set up reader for top n queries service - final QueryInsightsReader reader = queryInsightsReaderFactory.createReader( - TOP_QUERIES_LOCAL_INDEX_READER_ID, - DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, - namedXContentRegistry - ); - queryInsightsReaderFactory.updateReader(reader, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN); + this.sinkType = sinkType; } /** @@ -472,12 +482,32 @@ private void setExporterAndReader(final SinkType sinkType, final Map MAX_DELETE_AFTER_VALUE) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES); + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Invalid exporter delete_after_days setting [%d], value should be an integer between %d and %d.", + deleteAfter, + MIN_DELETE_AFTER_VALUE, + MAX_DELETE_AFTER_VALUE + ) + ); } - deleteExpiredTopNIndices(); } /** @@ -564,7 +594,7 @@ public QueryInsightsHealthStats getHealthStats() { * Delete Top N local indices older than the configured data retention period */ void deleteExpiredTopNIndices() { - final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID); + final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_EXPORTER_ID); if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) { final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter; threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> { @@ -586,14 +616,14 @@ void deleteExpiredTopNIndices() { /** * Deletes all Top N local indices * - * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata} - */ - void deleteAllTopNIndices( - final Client client, - final Map indexMetadataMap, - final LocalIndexExporter localIndexExporter - ) { - indexMetadataMap.entrySet() + * @param client OpenSearch Client + * @param localIndexExporter the exporter to handle the local index operations + */ + void deleteAllTopNIndices(final Client client, final LocalIndexExporter localIndexExporter) { + clusterService.state() + .metadata() + .indices() + .entrySet() .stream() .filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue())) .forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client)); diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index e4cbed12..8f5bc5ef 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -9,8 +9,6 @@ package org.opensearch.plugin.insights.core.service; import static org.opensearch.plugin.insights.core.service.QueryInsightsService.QUERY_INSIGHTS_INDEX_TAG_NAME; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; import java.io.IOException; @@ -63,8 +61,17 @@ * with high latency or resource usage */ public class TopQueriesService { - public static final String TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID = "top_queries_local_index_exporter"; - public static final String TOP_QUERIES_LOCAL_INDEX_READER_ID = "top_queries_local_index_reader"; + /** + * Top queries services utilize a shared exporter and reader instance. + * These shared components are uniquely identified by TOP_QUERIES_EXPORTER_ID and TOP_QUERIES_READER_ID + */ + public static final String TOP_QUERIES_EXPORTER_ID = "top_queries_exporter"; + public static final String TOP_QUERIES_READER_ID = "top_queries_reader"; + /** + * Tag value used to identify local index mappings that are specifically created + * by TopQueriesService. This tag serves as a unique identifier for tracking and + * managing indices associated with top queries operations. + */ public static final String TOP_QUERIES_INDEX_TAG_VALUE = "top_n_queries"; private static final String METRIC_TYPE_TAG = "metric_type"; private static final String GROUPBY_TAG = "groupby"; @@ -354,7 +361,7 @@ public List getTopQueriesRecordsFromIndex(final String from, } final List queries = new ArrayList<>(); - final QueryInsightsReader reader = queryInsightsReaderFactory.getReader(TOP_QUERIES_LOCAL_INDEX_READER_ID); + final QueryInsightsReader reader = queryInsightsReaderFactory.getReader(TOP_QUERIES_READER_ID); if (reader != null) { try { final ZonedDateTime start = ZonedDateTime.parse(from); @@ -444,7 +451,7 @@ private void rotateWindowIfNecessary(final long newWindowStart) { topQueriesCurrentSnapshot.set(new ArrayList<>()); windowStart = newWindowStart; // export to the configured sink - QueryInsightsExporter exporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID); + QueryInsightsExporter exporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_EXPORTER_ID); if (exporter != null) { threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> exporter.export(history)); } @@ -498,26 +505,6 @@ public TopQueriesHealthStats getHealthStats() { return new TopQueriesHealthStats(this.topQueriesStore.size(), this.queryGrouper.getHealthStats()); } - /** - * Validate the exporter delete after value - * - * @param deleteAfter exporter and reader settings - */ - static void validateExporterDeleteAfter(final int deleteAfter) { - if (deleteAfter < MIN_DELETE_AFTER_VALUE || deleteAfter > MAX_DELETE_AFTER_VALUE) { - OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES); - throw new IllegalArgumentException( - String.format( - Locale.ROOT, - "Invalid exporter delete_after_days setting [%d], value should be an integer between %d and %d.", - deleteAfter, - MIN_DELETE_AFTER_VALUE, - MAX_DELETE_AFTER_VALUE - ) - ); - } - } - /** * Validates if the input string is a Query Insights local index * in the format "top_queries-YYYY.MM.dd-XXXXX", and has the expected index metadata. diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsRestTestCase.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsRestTestCase.java index b7e73e03..0bb6194a 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsRestTestCase.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsRestTestCase.java @@ -191,6 +191,8 @@ protected String defaultTopQueriesSettings() { + " \"search.insights.top_queries.latency.enabled\" : \"true\",\n" + " \"search.insights.top_queries.latency.window_size\" : \"1m\",\n" + " \"search.insights.top_queries.latency.top_n_size\" : 5,\n" + + " \"search.insights.top_queries.memory.enabled\" : \"false\",\n" + + " \"search.insights.top_queries.cpu.enabled\" : \"false\",\n" + " \"search.insights.top_queries.group_by\" : \"none\"\n" + " }\n" + "}"; diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java index 4afb3c7e..8c104530 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java @@ -66,11 +66,11 @@ public void testInvalidExporterTypeConfig() { } public void testCreateAndCloseExporter() { - QueryInsightsExporter exporter1 = queryInsightsExporterFactory.createExporter("id", SinkType.LOCAL_INDEX, format, ""); + QueryInsightsExporter exporter1 = queryInsightsExporterFactory.createLocalIndexExporter("id-index", format, ""); assertTrue(exporter1 instanceof LocalIndexExporter); - QueryInsightsExporter exporter2 = queryInsightsExporterFactory.createExporter("id", SinkType.DEBUG, format, ""); + QueryInsightsExporter exporter2 = queryInsightsExporterFactory.createDebugExporter("id-debug"); assertTrue(exporter2 instanceof DebugExporter); - QueryInsightsExporter exporter3 = queryInsightsExporterFactory.createExporter("id", SinkType.DEBUG, format, ""); + QueryInsightsExporter exporter3 = queryInsightsExporterFactory.createDebugExporter("id-debug2"); assertTrue(exporter3 instanceof DebugExporter); try { queryInsightsExporterFactory.closeExporter(exporter1); diff --git a/src/test/java/org/opensearch/plugin/insights/core/metrics/OperationalMetricsCounterTests.java b/src/test/java/org/opensearch/plugin/insights/core/metrics/OperationalMetricsCounterTests.java index b3d4ab0c..e9d9b1e1 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/metrics/OperationalMetricsCounterTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/metrics/OperationalMetricsCounterTests.java @@ -35,7 +35,7 @@ public void testSingletonInitializationAndIncrement() { OperationalMetricsCounter.initialize(CLUSTER_NAME, metricsRegistry); OperationalMetricsCounter instance = OperationalMetricsCounter.getInstance(); ArgumentCaptor nameCaptor = ArgumentCaptor.forClass(String.class); - verify(metricsRegistry, times(9)).createCounter(nameCaptor.capture(), any(), eq("1")); + verify(metricsRegistry, times(10)).createCounter(nameCaptor.capture(), any(), eq("1")); assertNotNull(instance); instance.incrementCounter(OperationalMetric.LOCAL_INDEX_READER_PARSING_EXCEPTIONS); instance.incrementCounter(OperationalMetric.LOCAL_INDEX_READER_PARSING_EXCEPTIONS); diff --git a/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java b/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java index 2a3537eb..ce2c9bae 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java @@ -45,7 +45,7 @@ public void setup() { } public void testCreateAndCloseReader() { - QueryInsightsReader reader1 = queryInsightsReaderFactory.createReader("id", format, namedXContentRegistry); + QueryInsightsReader reader1 = queryInsightsReaderFactory.createLocalIndexReader("id", format, namedXContentRegistry); assertTrue(reader1 instanceof LocalIndexReader); try { queryInsightsReaderFactory.closeReader(reader1); diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index d4b03e1d..fccc0063 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -9,7 +9,12 @@ package org.opensearch.plugin.insights.core.service; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -17,8 +22,8 @@ import static org.mockito.Mockito.when; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; import static org.opensearch.plugin.insights.core.service.QueryInsightsService.QUERY_INSIGHTS_INDEX_TAG_NAME; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_EXPORTER_ID; import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_INDEX_TAG_VALUE; -import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.ENTRY_COUNT; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.EVICTIONS; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HIT_COUNT; @@ -26,6 +31,7 @@ import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.SIZE_IN_BYTES; import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash; +import java.io.IOException; import java.time.Instant; import java.time.ZoneId; import java.time.ZoneOffset; @@ -50,15 +56,20 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.core.exporter.DebugExporter; import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.core.exporter.SinkType; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; +import org.opensearch.plugin.insights.core.reader.QueryInsightsReader; +import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory; import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator; import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; @@ -84,17 +95,23 @@ public class QueryInsightsServiceTests extends OpenSearchTestCase { private final Client client = mock(Client.class); private final NamedXContentRegistry namedXContentRegistry = mock(NamedXContentRegistry.class); private QueryInsightsService queryInsightsService; - private QueryInsightsService newQueryInsightsService; private QueryInsightsService queryInsightsServiceSpy; private final AdminClient adminClient = mock(AdminClient.class); private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); private ClusterService clusterService; - private ClusterService newClusterService; - private LocalIndexExporter localIndexExporter; + private LocalIndexExporter mockLocalIndexExporter; + private DebugExporter mockDebugExporter; + private QueryInsightsReader mockReader; + private QueryInsightsExporterFactory queryInsightsExporterFactory; + private QueryInsightsReaderFactory queryInsightsReaderFactory; @Before public void setup() { - localIndexExporter = mock(LocalIndexExporter.class); + queryInsightsExporterFactory = mock(QueryInsightsExporterFactory.class); + queryInsightsReaderFactory = mock(QueryInsightsReaderFactory.class); + mockLocalIndexExporter = mock(LocalIndexExporter.class); + mockDebugExporter = mock(DebugExporter.class); + mockReader = mock(QueryInsightsReader.class); Settings.Builder settingsBuilder = Settings.builder(); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -109,7 +126,9 @@ public void setup() { threadPool, client, NoopMetricsRegistry.INSTANCE, - namedXContentRegistry + namedXContentRegistry, + queryInsightsExporterFactory, + queryInsightsReaderFactory ); queryInsightsService.enableCollection(MetricType.LATENCY, true); queryInsightsService.enableCollection(MetricType.CPU, true); @@ -133,18 +152,9 @@ public void tearDown() throws Exception { if (clusterService != null) { IOUtils.close(clusterService); } - if (newClusterService != null) { - IOUtils.close(newClusterService); - } if (queryInsightsService != null) { queryInsightsService.doClose(); } - if (newClusterService != null) { - IOUtils.close(newClusterService); - } - if (newQueryInsightsService != null) { - newQueryInsightsService.doClose(); - } ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } @@ -282,7 +292,7 @@ public void testGetHealthStats() { assertTrue(fieldTypeCacheStats.containsKey(MISS_COUNT)); } - public void testDeleteAllTopNIndices() { + public void testDeleteAllTopNIndices() throws IOException { // Create 9 top_queries-* indices Map indexMetadataMap = new HashMap<>(); for (int i = 1; i < 10; i++) { @@ -325,21 +335,20 @@ public void testDeleteAllTopNIndices() { .build(); indexMetadataMap.put(indexName, indexMetadata); } + List updatedService = createQueryInsightsServiceWithIndexState(indexMetadataMap); + QueryInsightsService updatedQueryInsightsService = (QueryInsightsService) updatedService.get(0); + ClusterService updatedClusterService = (ClusterService) updatedService.get(1); - queryInsightsService.deleteAllTopNIndices(client, indexMetadataMap, localIndexExporter); - // All 10 indices should be deleted - verify(localIndexExporter, times(9)).deleteSingleIndex(any(), any()); + updatedQueryInsightsService.deleteAllTopNIndices(client, mockLocalIndexExporter); + // All 10 top_queries-* indices should be deleted, while none of the users indices should be deleted + verify(mockLocalIndexExporter, times(9)).deleteSingleIndex(argThat(str -> str.matches("top_queries-.*")), any()); + + IOUtils.close(updatedClusterService); + updatedQueryInsightsService.doClose(); } - public void testDeleteExpiredTopNIndices() throws InterruptedException { - // Create a new cluster state with expired index mappings - Settings.Builder settingsBuilder = Settings.builder(); - Settings settings = settingsBuilder.build(); - ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - QueryInsightsTestUtils.registerAllQueryInsightsSettings(clusterSettings); - // Create a mock cluster state with expired indices - ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("", true, 1 + randomInt(3), randomInt(2)); - RoutingTable.Builder routingTable = RoutingTable.builder(state.routingTable()); + public void testDeleteExpiredTopNIndices() throws InterruptedException, IOException { + // Test with a new cluster state with expired index mappings // Create 9 top_queries-* indices with creation dates older than the retention period Map indexMetadataMap = new HashMap<>(); for (int i = 1; i < 10; i++) { @@ -361,42 +370,17 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException { ) .build(); indexMetadataMap.put(indexName, indexMetadata); - routingTable.addAsRecovery(indexMetadata); } - // Update the cluster state with the new indices - ClusterState updatedState = ClusterState.builder(state) - .metadata(Metadata.builder(state.metadata()).indices(indexMetadataMap).build()) - .routingTable(routingTable.build()) - .build(); - // Create a new cluster service with the updated state - newClusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings); - ClusterServiceUtils.setState(newClusterService, updatedState); - // Initialize the QueryInsightsService with the new cluster service - newQueryInsightsService = new QueryInsightsService( - newClusterService, - threadPool, - client, - NoopMetricsRegistry.INSTANCE, - namedXContentRegistry - ); - newQueryInsightsService.enableCollection(MetricType.LATENCY, true); - newQueryInsightsService.enableCollection(MetricType.CPU, true); - newQueryInsightsService.enableCollection(MetricType.MEMORY, true); - newQueryInsightsService.setQueryShapeGenerator(new QueryShapeGenerator(newClusterService)); - // Create a local index exporter with a retention period of 7 days - newQueryInsightsService.queryInsightsExporterFactory.createExporter( - TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID, - SinkType.LOCAL_INDEX, - "YYYY.MM.dd", - "" - ); + List updatedService = createQueryInsightsServiceWithIndexState(indexMetadataMap); + QueryInsightsService updatedQueryInsightsService = (QueryInsightsService) updatedService.get(0); + ClusterService updatedClusterService = (ClusterService) updatedService.get(1); CountDownLatch latch = new CountDownLatch(9); doAnswer(invocation -> { latch.countDown(); return null; }).when(indicesAdminClient).delete(any(), any()); // Call the method under test - newQueryInsightsService.deleteExpiredTopNIndices(); + updatedQueryInsightsService.deleteExpiredTopNIndices(); assertTrue(latch.await(10, TimeUnit.SECONDS)); // Verify that the correct number of indices are deleted @@ -404,5 +388,197 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException { verify(client, times(9)).admin(); verify(adminClient, times(9)).indices(); verify(indicesAdminClient, times(9)).delete(any(), any()); + + IOUtils.close(updatedClusterService); + updatedQueryInsightsService.doClose(); + } + + public void testValidateExporterDeleteAfter() { + this.queryInsightsService.validateExporterDeleteAfter(7); + this.queryInsightsService.validateExporterDeleteAfter(180); + this.queryInsightsService.validateExporterDeleteAfter(1); + assertThrows(IllegalArgumentException.class, () -> { this.queryInsightsService.validateExporterDeleteAfter(-1); }); + assertThrows(IllegalArgumentException.class, () -> { this.queryInsightsService.validateExporterDeleteAfter(0); }); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + this.queryInsightsService.validateExporterDeleteAfter(181); + }); + assertEquals( + "Invalid exporter delete_after_days setting [181], value should be an integer between 1 and 180.", + exception.getMessage() + ); + } + + public void testSetExporterAndReaderType_SwitchFromLocalIndexToNone() throws IOException { + // Mock current exporter and reader + queryInsightsServiceSpy.sinkType = SinkType.LOCAL_INDEX; + doNothing().when(queryInsightsServiceSpy).deleteAllTopNIndices(any(), any()); + // Mock current exporter and reader + when(queryInsightsExporterFactory.getExporter(TopQueriesService.TOP_QUERIES_EXPORTER_ID)).thenReturn(mockLocalIndexExporter); + when(queryInsightsReaderFactory.getReader(TopQueriesService.TOP_QUERIES_READER_ID)).thenReturn(mockReader); + + // Execute method + queryInsightsServiceSpy.setExporterAndReaderType(SinkType.NONE); + // Verify cleanup of local indices + verify(queryInsightsServiceSpy, times(1)).deleteAllTopNIndices(client, mockLocalIndexExporter); + verify(queryInsightsExporterFactory, times(1)).closeExporter(mockLocalIndexExporter); + verify(queryInsightsReaderFactory, times(1)).closeReader(mockReader); + // Verify exporter is set to NONE + assertEquals(SinkType.NONE, queryInsightsServiceSpy.sinkType); + } + + public void testSetExporterAndReaderType_SwitchFromLocalIndexToDebug() throws IOException { + // Mock current exporter and reader + queryInsightsServiceSpy.sinkType = SinkType.LOCAL_INDEX; + doNothing().when(queryInsightsServiceSpy).deleteAllTopNIndices(any(), any()); + // Mock current exporter and reader + when(queryInsightsExporterFactory.getExporter(TopQueriesService.TOP_QUERIES_EXPORTER_ID)).thenReturn(mockLocalIndexExporter); + when(queryInsightsReaderFactory.getReader(TopQueriesService.TOP_QUERIES_READER_ID)).thenReturn(mockReader); + + // Execute method + queryInsightsServiceSpy.setExporterAndReaderType(SinkType.DEBUG); + // Verify cleanup of local indices + verify(queryInsightsServiceSpy, times(1)).deleteAllTopNIndices(client, mockLocalIndexExporter); + verify(queryInsightsExporterFactory, times(1)).closeExporter(mockLocalIndexExporter); + verify(queryInsightsReaderFactory, times(1)).closeReader(mockReader); + // Verify exporter is set to NONE + assertEquals(SinkType.DEBUG, queryInsightsServiceSpy.sinkType); + } + + public void testSetExporterAndReaderType_SwitchFromNoneToLocalIndex() throws IOException { + queryInsightsServiceSpy.sinkType = SinkType.NONE; + // Mock current exporter and reader + when(queryInsightsExporterFactory.getExporter(TopQueriesService.TOP_QUERIES_EXPORTER_ID)).thenReturn(null); + when(queryInsightsReaderFactory.getReader(TopQueriesService.TOP_QUERIES_READER_ID)).thenReturn(null); + // Execute method + queryInsightsServiceSpy.setExporterAndReaderType(SinkType.LOCAL_INDEX); + // Verify new local index exporter setup + // 2 times, one for initialization, one for the above method call + verify(queryInsightsExporterFactory, times(2)).createLocalIndexExporter( + eq(TopQueriesService.TOP_QUERIES_EXPORTER_ID), + anyString(), + anyString() + ); + verify(queryInsightsReaderFactory, times(2)).createLocalIndexReader( + eq(TopQueriesService.TOP_QUERIES_READER_ID), + anyString(), + eq(namedXContentRegistry) + ); + verify(queryInsightsExporterFactory, times(0)).closeExporter(any()); + verify(queryInsightsReaderFactory, times(0)).closeReader(any()); + verify(queryInsightsServiceSpy, times(0)).deleteAllTopNIndices(any(), any()); + assertEquals(SinkType.LOCAL_INDEX, queryInsightsServiceSpy.sinkType); + } + + public void testSetExporterAndReaderType_SwitchFromNoneToDebug() throws IOException { + queryInsightsServiceSpy.sinkType = SinkType.NONE; + // Mock current exporter and reader + when(queryInsightsExporterFactory.getExporter(TopQueriesService.TOP_QUERIES_EXPORTER_ID)).thenReturn(null); + when(queryInsightsReaderFactory.getReader(TopQueriesService.TOP_QUERIES_READER_ID)).thenReturn(null); + // Execute method + queryInsightsServiceSpy.setExporterAndReaderType(SinkType.DEBUG); + // Verify new local index exporter setup + verify(queryInsightsExporterFactory, times(1)).createDebugExporter(eq(TopQueriesService.TOP_QUERIES_EXPORTER_ID)); + verify(queryInsightsServiceSpy, times(0)).deleteAllTopNIndices(any(), any()); + verify(queryInsightsExporterFactory, times(0)).closeExporter(any()); + verify(queryInsightsReaderFactory, times(0)).closeReader(any()); + assertEquals(SinkType.DEBUG, queryInsightsServiceSpy.sinkType); + } + + public void testSetExporterAndReaderType_SwitchFromDebugToLocalIndex() throws IOException { + queryInsightsServiceSpy.sinkType = SinkType.DEBUG; + // Mock current exporter and reader + when(queryInsightsExporterFactory.getExporter(TopQueriesService.TOP_QUERIES_EXPORTER_ID)).thenReturn(mockDebugExporter); + when(queryInsightsReaderFactory.getReader(TopQueriesService.TOP_QUERIES_READER_ID)).thenReturn(mockReader); + // Execute method + queryInsightsServiceSpy.setExporterAndReaderType(SinkType.LOCAL_INDEX); + // Verify new local index exporter setup + // 2 times, one for initialization, one for the above method call + verify(queryInsightsExporterFactory, times(2)).createLocalIndexExporter( + eq(TopQueriesService.TOP_QUERIES_EXPORTER_ID), + anyString(), + anyString() + ); + verify(queryInsightsReaderFactory, times(2)).createLocalIndexReader( + eq(TopQueriesService.TOP_QUERIES_READER_ID), + anyString(), + eq(namedXContentRegistry) + ); + verify(queryInsightsExporterFactory, times(1)).closeExporter(mockDebugExporter); + verify(queryInsightsReaderFactory, times(1)).closeReader(mockReader); + verify(queryInsightsServiceSpy, times(0)).deleteAllTopNIndices(any(), any()); + assertEquals(SinkType.LOCAL_INDEX, queryInsightsServiceSpy.sinkType); + } + + public void testSetExporterAndReaderType_SwitchFromDebugToNone() throws IOException { + queryInsightsServiceSpy.sinkType = SinkType.DEBUG; + // Mock current exporter and reader + when(queryInsightsExporterFactory.getExporter(TopQueriesService.TOP_QUERIES_EXPORTER_ID)).thenReturn(mockDebugExporter); + when(queryInsightsReaderFactory.getReader(TopQueriesService.TOP_QUERIES_READER_ID)).thenReturn(mockReader); + // Execute method + queryInsightsServiceSpy.setExporterAndReaderType(SinkType.NONE); + // Verify new local index exporter setup + // 2 times, one for initialization, one for the above method call + verify(queryInsightsServiceSpy, times(0)).deleteAllTopNIndices(client, mockLocalIndexExporter); + verify(queryInsightsExporterFactory, times(1)).closeExporter(mockDebugExporter); + verify(queryInsightsReaderFactory, times(1)).closeReader(mockReader); + assertEquals(SinkType.NONE, queryInsightsServiceSpy.sinkType); + } + + public void testSetExporterAndReaderType_CloseWithException() throws IOException { + queryInsightsServiceSpy.sinkType = SinkType.LOCAL_INDEX; + doNothing().when(queryInsightsServiceSpy).deleteAllTopNIndices(any(), any()); + // Mock current exporter that throws an exception when closing + when(queryInsightsExporterFactory.getExporter(TopQueriesService.TOP_QUERIES_EXPORTER_ID)).thenReturn(mockLocalIndexExporter); + when(queryInsightsReaderFactory.getReader(TopQueriesService.TOP_QUERIES_READER_ID)).thenReturn(mockReader); + doThrow(new IOException("Exporter close error")).when(queryInsightsExporterFactory).closeExporter(mockLocalIndexExporter); + doThrow(new IOException("Reader close error")).when(queryInsightsReaderFactory).closeReader(mockReader); + // Execute method + queryInsightsServiceSpy.setExporterAndReaderType(SinkType.DEBUG); + // Verify exception handling + verify(queryInsightsExporterFactory, times(1)).closeExporter(mockLocalIndexExporter); + verify(queryInsightsReaderFactory, times(1)).closeReader(any()); + // Ensure new exporter is still created + verify(queryInsightsExporterFactory, times(1)).createDebugExporter(TopQueriesService.TOP_QUERIES_EXPORTER_ID); + } + + // Util functions + private List createQueryInsightsServiceWithIndexState(Map indexMetadataMap) { + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + QueryInsightsTestUtils.registerAllQueryInsightsSettings(clusterSettings); + // Create a mock cluster state with expired indices + ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("", true, 1 + randomInt(3), randomInt(2)); + RoutingTable.Builder routingTable = RoutingTable.builder(state.routingTable()); + indexMetadataMap.forEach((indexName, indexMetadata) -> { routingTable.addAsRecovery(indexMetadata); }); + // Update the cluster state with the new indices + ClusterState updatedState = ClusterState.builder(state) + .metadata(Metadata.builder(state.metadata()).indices(indexMetadataMap).build()) + .routingTable(routingTable.build()) + .build(); + // Create a new cluster service with the updated state + ClusterService updatedClusterService = ClusterServiceUtils.createClusterService( + threadPool, + state.getNodes().getLocalNode(), + clusterSettings + ); + ClusterServiceUtils.setState(updatedClusterService, updatedState); + // Initialize the QueryInsightsService with the new cluster service + QueryInsightsService updatedQueryInsightsService = new QueryInsightsService( + updatedClusterService, + threadPool, + client, + NoopMetricsRegistry.INSTANCE, + namedXContentRegistry, + new QueryInsightsExporterFactory(client, clusterService), + new QueryInsightsReaderFactory(client) + ); + updatedQueryInsightsService.enableCollection(MetricType.LATENCY, true); + updatedQueryInsightsService.enableCollection(MetricType.CPU, true); + updatedQueryInsightsService.enableCollection(MetricType.MEMORY, true); + updatedQueryInsightsService.setQueryShapeGenerator(new QueryShapeGenerator(updatedClusterService)); + // Create a local index exporter with a retention period of 7 days + updatedQueryInsightsService.queryInsightsExporterFactory.createLocalIndexExporter(TOP_QUERIES_EXPORTER_ID, "YYYY.MM.dd", ""); + return List.of(updatedQueryInsightsService, updatedClusterService); } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index 0971fc25..ece5702d 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -15,7 +15,6 @@ import static org.opensearch.plugin.insights.core.service.QueryInsightsService.QUERY_INSIGHTS_INDEX_TAG_NAME; import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_INDEX_TAG_VALUE; import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex; -import static org.opensearch.plugin.insights.core.service.TopQueriesService.validateExporterDeleteAfter; import java.time.Instant; import java.util.List; @@ -207,19 +206,6 @@ public void testGetHealthStats_WithData() { assertEquals(0, healthStats.getQueryGrouperHealthStats().getQueryGroupCount()); } - public void testValidateExporterDeleteAfter() { - validateExporterDeleteAfter(7); - validateExporterDeleteAfter(180); - validateExporterDeleteAfter(1); - assertThrows(IllegalArgumentException.class, () -> { validateExporterDeleteAfter(-1); }); - assertThrows(IllegalArgumentException.class, () -> { validateExporterDeleteAfter(0); }); - IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { validateExporterDeleteAfter(181); }); - assertEquals( - "Invalid exporter delete_after_days setting [181], value should be an integer between 1 and 180.", - exception.getMessage() - ); - } - private IndexMetadata createValidIndexMetadata(String indexName) { // valid index metadata long creationTime = Instant.now().toEpochMilli(); diff --git a/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/TopQueriesRestIT.java b/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/TopQueriesRestIT.java index b2d188da..726c0684 100644 --- a/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/TopQueriesRestIT.java +++ b/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/TopQueriesRestIT.java @@ -49,7 +49,7 @@ public void testQueryInsightsPluginInstalled() throws IOException { public void testTopQueriesResponses() throws IOException, InterruptedException { waitForEmptyTopQueriesResponse(); - // Enable Top N Queries feature + // Enable only Top N Queries by latency feature updateClusterSettings(this::defaultTopQueriesSettings); doSearch(2);