Skip to content

Commit

Permalink
Top N indices auto deletion config & functionality
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Jan 7, 2025
1 parent f570d9f commit 5c84a92
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public Collection<Object> createComponents(
OperationalMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry);
// create top n queries service
final QueryInsightsService queryInsightsService = new QueryInsightsService(
clusterService.getClusterSettings(),
clusterService,
threadPool,
client,
metricsRegistry,
Expand Down Expand Up @@ -145,6 +145,7 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING,
QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER,
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,31 @@

package org.opensearch.plugin.insights.core.exporter;

import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.service.TopQueriesService;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

/**
Expand All @@ -36,6 +45,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
private final Logger logger = LogManager.getLogger();
private final Client client;
private DateTimeFormatter indexPattern;
private int deleteAfter;

/**
* Constructor of LocalIndexExporter
Expand All @@ -46,6 +56,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
this.client = client;
this.deleteAfter = DEFAULT_DELETE_AFTER_VALUE;
}

/**
Expand All @@ -61,11 +72,9 @@ public DateTimeFormatter getIndexPattern() {
* Setter of indexPattern
*
* @param indexPattern index pattern
* @return the current LocalIndexExporter
*/
public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) {
void setIndexPattern(DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
return this;
}

/**
Expand All @@ -75,15 +84,15 @@ public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) {
*/
@Override
public void export(final List<SearchQueryRecord> records) {
if (records == null || records.size() == 0) {
if (records == null || records.isEmpty()) {
return;
}
try {
final String index = getDateTimeFromFormat();
final String indexName = buildLocalIndexName();
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
for (SearchQueryRecord record : records) {
bulkRequestBuilder.add(
new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
new IndexRequest(indexName).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
Expand All @@ -110,7 +119,52 @@ public void close() {
logger.debug("Closing the LocalIndexExporter..");
}

private String getDateTimeFromFormat() {
return indexPattern.format(ZonedDateTime.now(ZoneOffset.UTC));
/**
* Builds the local index name using the current UTC datetime
*
* @return A string representing the index name in the format "top_queries-YYYY.MM.dd-01234".
*/
String buildLocalIndexName() {
return indexPattern.format(ZonedDateTime.now(ZoneOffset.UTC)) + "-" + generateLocalIndexDateHash();
}

/**
* Set local index exporter data retention period
*
* @param deleteAfter the number of days after which Top N local indices should be deleted
*/
public void setDeleteAfter(final int deleteAfter) {
this.deleteAfter = deleteAfter;
}

/**
* Delete Top N local indices older than the configured data retention period
*
* @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
*/
public void deleteExpiredTopNIndices(final Map<String, IndexMetadata> indexMetadataMap) {
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (isTopQueriesIndex(indexName) && entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
TopQueriesService.deleteSingleIndex(indexName, client);
}
}
}

/**
* Generates a consistent 5-digit numeric hash based on the current UTC date.
* The generated hash is deterministic, meaning it will return the same result for the same date.
*
* @return A 5-digit numeric string representation of the current date's hash.
*/
public static String generateLocalIndexDateHash() {
// Get the current date in UTC (yyyy-MM-dd format)
String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT)
.format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate());

// Generate a 5-digit numeric hash from the date's hashCode
return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
public enum OperationalMetric {
LOCAL_INDEX_READER_PARSING_EXCEPTIONS("Number of errors when parsing with LocalIndexReader"),
LOCAL_INDEX_EXPORTER_BULK_FAILURES("Number of failures when ingesting Query Insights data to local indices"),
LOCAL_INDEX_EXPORTER_DELETE_FAILURES("Number of failures when deleting local indices"),
LOCAL_INDEX_EXPORTER_EXCEPTIONS("Number of exceptions in Query Insights LocalIndexExporter"),
INVALID_EXPORTER_TYPE_FAILURES("Number of invalid exporter type failures"),
INVALID_INDEX_PATTERN_EXCEPTIONS("Number of invalid index pattern exceptions"),
DATA_INGEST_EXCEPTIONS("Number of exceptions during data ingest in Query Insights"),
QUERY_CATEGORIZE_EXCEPTIONS("Number of exceptions when categorizing the queries"),
EXPORTER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the exporter"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.plugin.insights.core.reader;

import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -99,8 +101,8 @@ public List<SearchQueryRecord> read(final String from, final String to) {
}
ZonedDateTime curr = start;
while (curr.isBefore(end.plusDays(1).toLocalDate().atStartOfDay(end.getZone()))) {
String index = getDateTimeFromFormat(curr);
SearchRequest searchRequest = new SearchRequest(index);
String indexName = buildLocalIndexName(curr);
SearchRequest searchRequest = new SearchRequest(indexName);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*");
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp")
Expand Down Expand Up @@ -135,7 +137,7 @@ public void close() {
logger.debug("Closing the LocalIndexReader..");
}

private String getDateTimeFromFormat(ZonedDateTime current) {
return current.format(indexPattern);
private String buildLocalIndexName(ZonedDateTime current) {
return current.format(indexPattern) + "-" + generateLocalIndexDateHash();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

import java.io.IOException;
Expand All @@ -19,13 +20,15 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand All @@ -52,13 +55,15 @@ public class QueryInsightsService extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(QueryInsightsService.class);

private final ClusterService clusterService;

/**
* The internal OpenSearch thread pool that execute async processing and exporting tasks
*/
private final ThreadPool threadPool;

/**
* Services to capture top n queries for different metric types
* Map of {@link MetricType} to associated {@link TopQueriesService}
*/
private final Map<MetricType, TopQueriesService> topQueriesServices;

Expand All @@ -73,10 +78,10 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
private final LinkedBlockingQueue<SearchQueryRecord> queryRecordsQueue;

/**
* Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when
* List of references to delayed operations {@link Scheduler.Cancellable} so they can be cancelled when
* the service closed concurrently.
*/
protected volatile Scheduler.Cancellable scheduledFuture;
protected volatile List<Scheduler.Cancellable> scheduledFutures;

/**
* Query Insights exporter factory
Expand All @@ -102,20 +107,21 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
/**
* Constructor of the QueryInsightsService
*
* @param clusterSettings OpenSearch cluster level settings
* @param clusterService OpenSearch cluster service
* @param threadPool The OpenSearch thread pool to run async tasks
* @param client OS client
* @param metricsRegistry Opentelemetry Metrics registry
* @param namedXContentRegistry NamedXContentRegistry for parsing purposes
*/
@Inject
public QueryInsightsService(
final ClusterSettings clusterSettings,
final ClusterService clusterService,
final ThreadPool threadPool,
final Client client,
final MetricsRegistry metricsRegistry,
final NamedXContentRegistry namedXContentRegistry
) {
this.clusterService = clusterService;
enableCollect = new HashMap<>();
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
this.threadPool = threadPool;
Expand All @@ -128,15 +134,22 @@ public QueryInsightsService(
enableCollect.put(metricType, false);
topQueriesServices.put(
metricType,
new TopQueriesService(metricType, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory)
new TopQueriesService(client, metricType, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory)
);
}
for (MetricType type : MetricType.allMetricTypes()) {
clusterSettings.addSettingsUpdateConsumer(
getExporterSettings(type),
(settings -> setExporterAndReader(type, settings)),
(settings -> validateExporterAndReaderConfig(type, settings))
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
getExporterSettings(type),
(settings -> setExporterAndReader(type, settings, clusterService.state().metadata().indices())),
(settings -> validateExporterAndReaderConfig(type, settings))
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_EXPORTER_DELETE_AFTER,
(settings -> setExporterDeleteAfter(type, settings)),
(TopQueriesService::validateExporterDeleteAfter)
);
}

this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
Expand Down Expand Up @@ -389,14 +402,26 @@ public void setTopNSize(final MetricType type, final int topNSize) {
* @param type {@link MetricType}
* @param settings exporter and reader settings
*/
public void setExporterAndReader(final MetricType type, final Settings settings) {
private void setExporterAndReader(final MetricType type, final Settings settings, final Map<String, IndexMetadata> indexMetadataMap) {
if (topQueriesServices.containsKey(type)) {
TopQueriesService tqs = topQueriesServices.get(type);
tqs.setExporter(settings);
tqs.setExporter(settings, indexMetadataMap);
tqs.setReader(settings, namedXContentRegistry);
}
}

/**
* Set the exporter delete after
*
* @param type {@link MetricType}
* @param deleteAfter the number of days after which Top N local indices should be deleted
*/
private void setExporterDeleteAfter(final MetricType type, final int deleteAfter) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).setExporterDeleteAfter(deleteAfter);
}
}

/**
* Get search query categorizer object
* @return SearchQueryCategorizer object
Expand All @@ -421,18 +446,32 @@ public void validateExporterAndReaderConfig(final MetricType type, final Setting
@Override
protected void doStart() {
if (isAnyFeatureEnabled()) {
scheduledFuture = threadPool.scheduleWithFixedDelay(
this::drainRecords,
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL,
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR
scheduledFutures = new ArrayList<>();
scheduledFutures.add(
threadPool.scheduleWithFixedDelay(
this::drainRecords,
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL,
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR
)
);
scheduledFutures.add(
threadPool.scheduleWithFixedDelay(
this::deleteExpiredTopNIndices,
new TimeValue(1, TimeUnit.DAYS), // Check for deletable indices once per day
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR
)
);
}
}

@Override
protected void doStop() {
if (scheduledFuture != null) {
scheduledFuture.cancel();
if (scheduledFutures != null) {
for (Scheduler.Cancellable cancellable : scheduledFutures) {
if (cancellable != null) {
cancellable.cancel();
}
}
}
}

Expand Down Expand Up @@ -462,4 +501,13 @@ public QueryInsightsHealthStats getHealthStats() {
topQueriesHealthStatsMap
);
}

/**
* Delete Top N local indices older than the configured data retention period
*/
private void deleteExpiredTopNIndices() {
for (MetricType metricType : MetricType.allMetricTypes()) {
topQueriesServices.get(metricType).deleteExpiredTopNIndices(clusterService.state().metadata().indices());
}
}
}
Loading

0 comments on commit 5c84a92

Please sign in to comment.