Skip to content

Commit

Permalink
Top N indices auto deletion config & functionality
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Jan 6, 2025
1 parent f570d9f commit 12ec6d5
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public Collection<Object> createComponents(
OperationalMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry);
// create top n queries service
final QueryInsightsService queryInsightsService = new QueryInsightsService(
clusterService.getClusterSettings(),
clusterService,
threadPool,
client,
metricsRegistry,
Expand Down Expand Up @@ -145,6 +145,7 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING,
QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER,
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,26 @@

package org.opensearch.plugin.insights.core.exporter;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE;

import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
Expand All @@ -36,6 +46,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
private final Logger logger = LogManager.getLogger();
private final Client client;
private DateTimeFormatter indexPattern;
private int deleteAfter;

/**
* Constructor of LocalIndexExporter
Expand All @@ -46,6 +57,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
this.client = client;
this.deleteAfter = DEFAULT_DELETE_AFTER_VALUE;
}

/**
Expand All @@ -61,11 +73,9 @@ public DateTimeFormatter getIndexPattern() {
* Setter of indexPattern
*
* @param indexPattern index pattern
* @return the current LocalIndexExporter
*/
public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) {
void setIndexPattern(DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
return this;
}

/**
Expand All @@ -75,15 +85,15 @@ public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) {
*/
@Override
public void export(final List<SearchQueryRecord> records) {
if (records == null || records.size() == 0) {
if (records == null || records.isEmpty()) {
return;
}
try {
final String index = getDateTimeFromFormat();
final String indexName = buildLocalIndexName();
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
for (SearchQueryRecord record : records) {
bulkRequestBuilder.add(
new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
new IndexRequest(indexName).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
Expand All @@ -110,7 +120,92 @@ public void close() {
logger.debug("Closing the LocalIndexExporter..");
}

private String getDateTimeFromFormat() {
return indexPattern.format(ZonedDateTime.now(ZoneOffset.UTC));
/**
* Builds the local index name using the current UTC datetime
*
* @return A string representing the index name in the format "top_queries-YYYY.MM.dd-01234".
*/
String buildLocalIndexName() {
return indexPattern.format(ZonedDateTime.now(ZoneOffset.UTC)) + "-" + generateLocalIndexDateHash();
}

/**
* Set local index exporter data retention period
*
* @param deleteAfter the number of days after which Top N local indices should be deleted
*/
public void setDeleteAfter(final int deleteAfter) {
this.deleteAfter = deleteAfter;
}

/**
* Delete Top N local indices older than the configured data retention period
*
* @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
*/
public void deleteExpiredIndices(final Map<String, IndexMetadata> indexMetadataMap) {
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (!isTopQueriesIndex(indexName)) {
continue;
}
if (entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
client.admin().indices().delete(new DeleteIndexRequest(indexName));
}
}
}

/**
* Validates if the input string is a Query Insights local index name
* in the format "top_queries-YYYY.MM.dd-XXXXX".
*
* @param indexName the string to validate.
* @return {@code true} if the string is valid, {@code false} otherwise.
*/
static boolean isTopQueriesIndex(String indexName) {
// Split the input string by '-'
String[] parts = indexName.split("-");

// Check if the string has exactly 3 parts
if (parts.length != 3) {
return false;
}

// Validate the first part is "top_queries"
if (!"top_queries".equals(parts[0])) {
return false;
}

// Validate the second part is a valid date in "YYYY.MM.dd" format
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd", Locale.ROOT);
try {
LocalDate.parse(parts[1], formatter);
} catch (DateTimeParseException e) {
return false;
}

// Validate the third part is exactly 5 digits
if (!parts[2].matches("\\d{5}")) {
return false;
}

return true;
}

/**
* Generates a consistent 5-digit numeric hash based on the current UTC date.
* The generated hash is deterministic, meaning it will return the same result for the same date.
*
* @return A 5-digit numeric string representation of the current date's hash.
*/
public static String generateLocalIndexDateHash() {
// Get the current date in UTC (yyyy-MM-dd format)
String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT)
.format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate());

// Generate a 5-digit numeric hash from the date's hashCode
return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
public enum OperationalMetric {
LOCAL_INDEX_READER_PARSING_EXCEPTIONS("Number of errors when parsing with LocalIndexReader"),
LOCAL_INDEX_EXPORTER_BULK_FAILURES("Number of failures when ingesting Query Insights data to local indices"),
LOCAL_INDEX_EXPORTER_DELETE_FAILURES("Number of failures when deleting local indices"),
LOCAL_INDEX_EXPORTER_EXCEPTIONS("Number of exceptions in Query Insights LocalIndexExporter"),
INVALID_EXPORTER_TYPE_FAILURES("Number of invalid exporter type failures"),
INVALID_INDEX_PATTERN_EXCEPTIONS("Number of invalid index pattern exceptions"),
DATA_INGEST_EXCEPTIONS("Number of exceptions during data ingest in Query Insights"),
QUERY_CATEGORIZE_EXCEPTIONS("Number of exceptions when categorizing the queries"),
EXPORTER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the exporter"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.plugin.insights.core.reader;

import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -99,8 +101,8 @@ public List<SearchQueryRecord> read(final String from, final String to) {
}
ZonedDateTime curr = start;
while (curr.isBefore(end.plusDays(1).toLocalDate().atStartOfDay(end.getZone()))) {
String index = getDateTimeFromFormat(curr);
SearchRequest searchRequest = new SearchRequest(index);
String indexName = buildLocalIndexName(curr);
SearchRequest searchRequest = new SearchRequest(indexName);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*");
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp")
Expand Down Expand Up @@ -135,7 +137,7 @@ public void close() {
logger.debug("Closing the LocalIndexReader..");
}

private String getDateTimeFromFormat(ZonedDateTime current) {
return current.format(indexPattern);
private String buildLocalIndexName(ZonedDateTime current) {
return current.format(indexPattern) + "-" + generateLocalIndexDateHash();
}
}
Loading

0 comments on commit 12ec6d5

Please sign in to comment.