Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Bug fix] Fix default exporter settings #236

Merged
merged 1 commit into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.listener.QueryInsightsListener;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.rules.action.health_stats.HealthStatsAction;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
Expand Down Expand Up @@ -86,7 +88,9 @@ public Collection<Object> createComponents(
threadPool,
client,
metricsRegistry,
xContentRegistry
xContentRegistry,
new QueryInsightsExporterFactory(client, clusterService),
new QueryInsightsReaderFactory(client)
);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService, false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class DebugExporter implements QueryInsightsExporter {
* Logger of the debug exporter
*/
private final Logger logger = LogManager.getLogger();
private static final String EXPORTER_ID = "debug_exporter";
private static final String DEBUG_EXPORTER_ID = "debug_exporter";

/**
* Constructor of DebugExporter
Expand All @@ -30,9 +30,14 @@ private DebugExporter() {}

@Override
public String getId() {
return EXPORTER_ID;
return DEBUG_EXPORTER_ID;
}

/**
* Singleton holder class for the DebugExporter instance.
* A single DebugExporter instance is shared across all services, using the default
* debug exporter identifier EXPORTER_ID.
*/
private static class InstanceHolder {
private static final DebugExporter INSTANCE = new DebugExporter();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ public LocalIndexExporter(
this.id = id;
}

/**
* Retrieves the identifier for the local index exporter.
*
* Each service can either have its own dedicated local index exporter or share
* an existing one. This identifier is used by the QueryInsightsExporterFactory
* to locate and manage the appropriate exporter instance.
*
* @return The identifier of the local index exporter
* @see QueryInsightsExporterFactory
*/
@Override
public String getId() {
return id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class QueryInsightsExporterFactory {
private final Logger logger = LogManager.getLogger();
final private Client client;
final private ClusterService clusterService;
/**
* Maps exporter identifiers to their corresponding exporter sink instances.
*/
final private Map<String, QueryInsightsExporter> exporters;

/**
Expand Down Expand Up @@ -66,27 +69,35 @@ public void validateExporterType(final String exporterType) throws IllegalArgume
}

/**
* Create an exporter based on provided parameters
* Create a local index exporter based on provided parameters
*
* @param id id of the exporter
* @param type The type of exporter to create
* @param id id of the exporter so that exporters can be retrieved and reused across services
* @param indexPattern the index pattern if creating an index exporter
* @param indexMapping index mapping file
* @return QueryInsightsExporter the created exporter sink
* @return LocalIndexExporter the created exporter sink
*/
public QueryInsightsExporter createExporter(String id, SinkType type, String indexPattern, String indexMapping) {
if (SinkType.LOCAL_INDEX.equals(type)) {
QueryInsightsExporter exporter = new LocalIndexExporter(
client,
clusterService,
DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT),
indexMapping,
id
);
this.exporters.put(id, exporter);
return exporter;
}
return DebugExporter.getInstance();
public LocalIndexExporter createLocalIndexExporter(String id, String indexPattern, String indexMapping) {
LocalIndexExporter exporter = new LocalIndexExporter(
client,
clusterService,
DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT),
indexMapping,
id
);
this.exporters.put(id, exporter);
return exporter;
}

/**
* Create a debug exporter based on provided parameters
*
* @param id id of the exporter so that exporters can be retrieved and reused across services
* @return DebugExporter the created exporter sink
*/
public DebugExporter createDebugExporter(String id) {
DebugExporter debugExporter = DebugExporter.getInstance();
this.exporters.put(id, debugExporter);
return debugExporter;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public enum OperationalMetric {
DATA_INGEST_EXCEPTIONS("Number of exceptions during data ingest in Query Insights"),
QUERY_CATEGORIZE_EXCEPTIONS("Number of exceptions when categorizing the queries"),
EXPORTER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the exporter"),
READER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the reader"),
TOP_N_QUERIES_USAGE_COUNT("Number of times the top n queries API is used");

private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ public QueryInsightsReaderFactory(final Client client) {
}

/**
* Create a Reader based on provided parameters
* Create a Local Index Reader based on provided parameters
*
* @param indexPattern the index pattern if creating an index Reader
* @param namedXContentRegistry for parsing purposes
* @return QueryInsightsReader the created Reader
*/
public QueryInsightsReader createReader(String id, String indexPattern, NamedXContentRegistry namedXContentRegistry) {
public QueryInsightsReader createLocalIndexReader(String id, String indexPattern, NamedXContentRegistry namedXContentRegistry) {
QueryInsightsReader reader = new LocalIndexReader(
client,
DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.opensearch.plugin.insights.core.service;

import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID;
import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_READER_ID;
import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_EXPORTER_ID;
import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_READER_ID;
import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TYPE;
Expand All @@ -23,6 +25,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -121,6 +124,7 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
private QueryShapeGenerator queryShapeGenerator;

private final Client client;
SinkType sinkType;

/**
* Constructor of the QueryInsightsService
Expand All @@ -137,14 +141,16 @@ public QueryInsightsService(
final ThreadPool threadPool,
final Client client,
final MetricsRegistry metricsRegistry,
final NamedXContentRegistry namedXContentRegistry
final NamedXContentRegistry namedXContentRegistry,
final QueryInsightsExporterFactory queryInsightsExporterFactory,
final QueryInsightsReaderFactory queryInsightsReaderFactory
) {
this.clusterService = clusterService;
enableCollect = new HashMap<>();
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
this.threadPool = threadPool;
this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client, clusterService);
this.queryInsightsReaderFactory = new QueryInsightsReaderFactory(client);
this.queryInsightsExporterFactory = queryInsightsExporterFactory;
this.queryInsightsReaderFactory = queryInsightsReaderFactory;
this.namedXContentRegistry = namedXContentRegistry;
this.client = client;
// initialize top n queries services and configurations consumers
Expand All @@ -153,22 +159,25 @@ public QueryInsightsService(
enableCollect.put(metricType, false);
topQueriesServices.put(
metricType,
new TopQueriesService(client, metricType, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory)
new TopQueriesService(client, metricType, threadPool, this.queryInsightsExporterFactory, this.queryInsightsReaderFactory)
);
}
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_EXPORTER_TYPE,
(v -> setExporterAndReader(SinkType.parse(v), clusterService.state().metadata().indices())),
(v -> setExporterAndReaderType(SinkType.parse(v))),
(this::validateExporterType)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_EXPORTER_DELETE_AFTER,
(this::setExporterDeleteAfterAndDelete),
(TopQueriesService::validateExporterDeleteAfter)
(this::validateExporterDeleteAfter)
);

this.setExporterDeleteAfterAndDelete(clusterService.getClusterSettings().get(TOP_N_EXPORTER_DELETE_AFTER));
this.setExporterAndReaderType(SinkType.parse(clusterService.getClusterSettings().get(TOP_N_EXPORTER_TYPE)));

this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
this.enableSearchQueryMetricsFeature(false);
this.groupingType = DEFAULT_GROUPING_TYPE;
Expand Down Expand Up @@ -414,70 +423,91 @@ public void setTopNSize(final MetricType type, final int topNSize) {
}

/**
* Set the exporter and reader config for a metricType
* Set the exporter and reader type config for a metricType
*
* @param sinkType {@link SinkType}
* @param indexMetadataMap index metadata map in the current cluster
*/
private void setExporterAndReader(final SinkType sinkType, final Map<String, IndexMetadata> indexMetadataMap) {
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID);

// This method is invoked when sink type is changed
// Clear local indices if exporter is of type LocalIndexExporter
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
deleteAllTopNIndices(client, indexMetadataMap, (LocalIndexExporter) topQueriesExporter);
public void setExporterAndReaderType(final SinkType sinkType) {
// Configure the exporter for TopQueriesService in QueryInsightsService
final QueryInsightsExporter currentExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_EXPORTER_ID);
final QueryInsightsReader currentReader = queryInsightsReaderFactory.getReader(TOP_QUERIES_READER_ID);
// Handles the cleanup when sink type is changed from LocalIndexExporter.
// Clears all local indices from storage when the exporter configuration
// is switched away from LocalIndexExporter type.
if (this.sinkType == SinkType.LOCAL_INDEX && currentExporter != null) {
deleteAllTopNIndices(client, (LocalIndexExporter) currentExporter);
}

if (sinkType != null) {
if (topQueriesExporter != null && sinkType == SinkType.getSinkTypeFromExporter(topQueriesExporter)) {
// this won't happen since we disallowed users to change index patterns.
// But leaving the hook here since we will add support for more sinks and configurations in the future.
queryInsightsExporterFactory.updateExporter(topQueriesExporter, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN);
} else {
try {
queryInsightsExporterFactory.closeExporter(topQueriesExporter);
} catch (IOException e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION);
logger.error("Fail to close the current exporter when updating exporter, error: ", e);
}
// this is a new exporter, create it for all underlying services.
queryInsightsExporterFactory.createExporter(
TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID,
sinkType,
DEFAULT_TOP_N_QUERIES_INDEX_PATTERN,
"mappings/top-queries-record.json"
);
}
} else {
// Disable exporter if exporter type is set to null
// Close the current exporter and reader
if (currentExporter != null) {
try {
queryInsightsExporterFactory.closeExporter(topQueriesExporter);
queryInsightsExporterFactory.closeExporter(currentExporter);
} catch (IOException e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION);
logger.error("Fail to close the current exporter when disabling exporter, error: ", e);
logger.error("Fail to close the current exporter when updating exporter and reader, error: ", e);
}
}
if (currentReader != null) {
try {
queryInsightsReaderFactory.closeReader(currentReader);
} catch (IOException e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.READER_FAIL_TO_CLOSE_EXCEPTION);
logger.error("Fail to close the current reader when updating exporter and reader, error: ", e);
}
}
// Set sink type to local index for TopQueriesServices
if (sinkType == SinkType.LOCAL_INDEX) {
queryInsightsExporterFactory.createLocalIndexExporter(
TOP_QUERIES_EXPORTER_ID,
DEFAULT_TOP_N_QUERIES_INDEX_PATTERN,
"mappings/top-queries-record.json"
);
// Set up reader for TopQueriesService
queryInsightsReaderFactory.createLocalIndexReader(
TOP_QUERIES_READER_ID,
DEFAULT_TOP_N_QUERIES_INDEX_PATTERN,
namedXContentRegistry
);
}
// Set sink type to debug exporter
else if (sinkType == SinkType.DEBUG) {
queryInsightsExporterFactory.createDebugExporter(TOP_QUERIES_EXPORTER_ID);
}

// set up reader for top n queries service
final QueryInsightsReader reader = queryInsightsReaderFactory.createReader(
TOP_QUERIES_LOCAL_INDEX_READER_ID,
DEFAULT_TOP_N_QUERIES_INDEX_PATTERN,
namedXContentRegistry
);
queryInsightsReaderFactory.updateReader(reader, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN);
this.sinkType = sinkType;
}

/**
* Set the exporter delete after, then delete expired Top N indices
*
* @param deleteAfter the number of days after which Top N local indices should be deleted
*/
private void setExporterDeleteAfterAndDelete(final int deleteAfter) {
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID);
public void setExporterDeleteAfterAndDelete(final int deleteAfter) {
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_EXPORTER_ID);
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
((LocalIndexExporter) topQueriesExporter).setDeleteAfter(deleteAfter);
deleteExpiredTopNIndices();
}
}

/**
* Validate the exporter delete after value
*
* @param deleteAfter exporter and reader settings
*/
void validateExporterDeleteAfter(final int deleteAfter) {
if (deleteAfter < MIN_DELETE_AFTER_VALUE || deleteAfter > MAX_DELETE_AFTER_VALUE) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES);
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Invalid exporter delete_after_days setting [%d], value should be an integer between %d and %d.",
deleteAfter,
MIN_DELETE_AFTER_VALUE,
MAX_DELETE_AFTER_VALUE
)
);
}
deleteExpiredTopNIndices();
}

/**
Expand Down Expand Up @@ -564,7 +594,7 @@ public QueryInsightsHealthStats getHealthStats() {
* Delete Top N local indices older than the configured data retention period
*/
void deleteExpiredTopNIndices() {
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID);
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_EXPORTER_ID);
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter;
threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> {
Expand All @@ -586,14 +616,14 @@ void deleteExpiredTopNIndices() {
/**
* Deletes all Top N local indices
*
* @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
*/
void deleteAllTopNIndices(
final Client client,
final Map<String, IndexMetadata> indexMetadataMap,
final LocalIndexExporter localIndexExporter
) {
indexMetadataMap.entrySet()
* @param client OpenSearch Client
* @param localIndexExporter the exporter to handle the local index operations
*/
void deleteAllTopNIndices(final Client client, final LocalIndexExporter localIndexExporter) {
clusterService.state()
.metadata()
.indices()
.entrySet()
.stream()
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
.forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client));
Expand Down
Loading
Loading