Skip to content

Commit

Permalink
improve safegard for deleting top queries indices with index mapping
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Jan 24, 2025
1 parent 03355c8 commit 70daa15
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
"measurements.cpu.number",
"measurements.memory.number"
);
private static final List<String> DEFAULT_SORTED_ORDERS = List.of(
"desc",
"desc",
"desc"
);
private static final List<String> DEFAULT_SORTED_ORDERS = List.of("desc", "desc", "desc");

/**
* Constructor of LocalIndexExporter
Expand All @@ -80,7 +76,13 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
* @param indexMapping the index mapping file
* @param id id of the exporter
*/
public LocalIndexExporter(final Client client, final ClusterService clusterService, final DateTimeFormatter indexPattern, final String indexMapping, final String id) {
public LocalIndexExporter(
final Client client,
final ClusterService clusterService,
final DateTimeFormatter indexPattern,
final String indexMapping,
final String id
) {
this.indexPattern = indexPattern;
this.client = client;
this.clusterService = clusterService;
Expand Down Expand Up @@ -127,11 +129,12 @@ public void export(final List<SearchQueryRecord> records) {
if (!checkIndexExists(indexName)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);

createIndexRequest.settings(Settings.builder()
.putList("index.sort.field", DEFAULT_SORTED_FIELDS)
.putList("index.sort.order", DEFAULT_SORTED_ORDERS)
.put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS)
.put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA)
createIndexRequest.settings(
Settings.builder()
.putList("index.sort.field", DEFAULT_SORTED_FIELDS)
.putList("index.sort.order", DEFAULT_SORTED_ORDERS)
.put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS)
.put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA)
);
createIndexRequest.mapping(readIndexMappings());

Expand All @@ -147,6 +150,7 @@ public void onResponse(CreateIndexResponse createIndexResponse) {
}
}
}

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
Expand Down Expand Up @@ -175,7 +179,8 @@ private void bulk(final String indexName, final List<SearchQueryRecord> records)
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
for (SearchQueryRecord record : records) {
bulkRequestBuilder.add(
new IndexRequest(indexName).id(record.getId()).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
new IndexRequest(indexName).id(record.getId())
.source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
Expand Down Expand Up @@ -225,7 +230,7 @@ public void deleteExpiredTopNIndices(final Map<String, IndexMetadata> indexMetad
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (isTopQueriesIndex(indexName) && entry.getValue().getCreationDate() <= expirationMillisLong) {
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
TopQueriesService.deleteSingleIndex(indexName, client);
}
Expand Down Expand Up @@ -263,8 +268,7 @@ private boolean checkIndexExists(String indexName) {
*/
private String readIndexMappings() throws IOException {
return new String(
Objects.requireNonNull(LocalIndexExporter.class.getClassLoader().getResourceAsStream(indexMapping))
.readAllBytes(),
Objects.requireNonNull(LocalIndexExporter.class.getClassLoader().getResourceAsStream(indexMapping)).readAllBytes(),
Charset.defaultCharset()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ public void validateExporterType(final String exporterType) throws IllegalArgume
*/
public QueryInsightsExporter createExporter(String id, SinkType type, String indexPattern, String indexMapping) {
if (SinkType.LOCAL_INDEX.equals(type)) {
QueryInsightsExporter exporter = new LocalIndexExporter(client, clusterService, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), indexMapping, id);
QueryInsightsExporter exporter = new LocalIndexExporter(
client,
clusterService,
DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT),
indexMapping,
id
);
this.exporters.put(id, exporter);
return exporter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public List<SearchQueryRecord> read(final String from, final String to, String i
while (curr.isBefore(end.plusDays(1).toLocalDate().atStartOfDay(end.getZone()))) {
String indexName = buildLocalIndexName(curr);
SearchRequest searchRequest = new SearchRequest(indexName);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(1000);
MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*");
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp")
.from(start.toInstant().toEpochMilli())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* information related to search queries
*/
public class QueryInsightsService extends AbstractLifecycleComponent {
public static final String QUERY_INSIGHTS_INDEX_TAG_NAME = "query_insights_feature_space";

private static final Logger logger = LogManager.getLogger(QueryInsightsService.class);

Expand Down Expand Up @@ -143,7 +144,7 @@ public QueryInsightsService(
enableCollect = new HashMap<>();
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
this.threadPool = threadPool;
this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client);
this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client, clusterService);
this.queryInsightsReaderFactory = new QueryInsightsReaderFactory(client);
this.namedXContentRegistry = namedXContentRegistry;
this.client = client;
Expand Down Expand Up @@ -581,7 +582,7 @@ private void deleteExpiredTopNIndices() {
void deleteAllTopNIndices(final Client client, final Map<String, IndexMetadata> indexMetadataMap) {
indexMetadataMap.entrySet()
.stream()
.filter(entry -> isTopQueriesIndex(entry.getKey()))
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
.forEach(entry -> deleteSingleIndex(entry.getKey(), client));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.plugin.insights.core.service;

import static org.opensearch.plugin.insights.core.service.QueryInsightsService.QUERY_INSIGHTS_INDEX_TAG_NAME;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
Expand All @@ -28,6 +29,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
Expand All @@ -37,6 +39,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
Expand Down Expand Up @@ -64,6 +67,7 @@
public class TopQueriesService {
public static final String TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID = "top_queries_local_index_exporter";
public static final String TOP_QUERIES_LOCAL_INDEX_READER_ID = "top_queries_local_index_reader";
public static final String TOP_QUERIES_INDEX_TAG_VALUE = "top_n_queries";
private static final String METRIC_TYPE_TAG = "metric_type";
private static final String GROUPBY_TAG = "groupby";

Expand Down Expand Up @@ -538,35 +542,55 @@ public void onFailure(Exception e) {
}

/**
* Validates if the input string is a Query Insights local index name
* in the format "top_queries-YYYY.MM.dd-XXXXX".
* Validates if the input string is a Query Insights local index
* in the format "top_queries-YYYY.MM.dd-XXXXX", and has the expected index metadata.
*
* @param indexName the string to validate.
* @param indexName the index name to validate.
* @param indexMetadata the metadata associated with the index
* @return {@code true} if the string is valid, {@code false} otherwise.
*/
public static boolean isTopQueriesIndex(String indexName) {
// Split the input string by '-'
String[] parts = indexName.split("-");
public static boolean isTopQueriesIndex(String indexName, IndexMetadata indexMetadata) {
try {
if (indexMetadata == null || indexMetadata.mapping() == null) {
return false;
}
Map<String, Object> sourceMap = Objects.requireNonNull(indexMetadata.mapping()).getSourceAsMap();
if (sourceMap == null || !sourceMap.containsKey("_meta")) {
return false;
}
Map<String, Object> metaMap = (Map<String, Object>) sourceMap.get("_meta");
if (metaMap == null || !metaMap.containsKey(QUERY_INSIGHTS_INDEX_TAG_NAME)) {
return false;
}
if (!metaMap.get(QUERY_INSIGHTS_INDEX_TAG_NAME).equals(TOP_QUERIES_INDEX_TAG_VALUE)) {
return false;
}

// Check if the string has exactly 3 parts
if (parts.length != 3) {
return false;
}
// Split the input string by '-'
String[] parts = indexName.split("-");

// Validate the first part is "top_queries"
if (!"top_queries".equals(parts[0])) {
return false;
}
// Check if the string has exactly 3 parts
if (parts.length != 3) {
return false;
}

// Validate the second part is a valid date in "YYYY.MM.dd" format
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd", Locale.ROOT);
try {
LocalDate.parse(parts[1], formatter);
} catch (DateTimeParseException e) {
// Validate the first part is "top_queries"
if (!"top_queries".equals(parts[0])) {
return false;
}

// Validate the second part is a valid date in "YYYY.MM.dd" format
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd", Locale.ROOT);
try {
LocalDate.parse(parts[1], formatter);
} catch (DateTimeParseException e) {
return false;
}

// Validate the third part is exactly 5 digits
return parts[2].matches("\\d{5}");
} catch (Exception e) {
return false;
}

// Validate the third part is exactly 5 digits
return parts[2].matches("\\d{5}");
}
}

0 comments on commit 70daa15

Please sign in to comment.