Skip to content

Commit

Permalink
Add field type cache stats
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Jan 23, 2025
1 parent 1d991b8 commit 36baab6
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public QueryInsightsListener(
this.clusterService = clusterService;
this.queryInsightsService = queryInsightsService;
this.queryShapeGenerator = new QueryShapeGenerator(clusterService);
queryInsightsService.setQueryShapeGenerator(queryShapeGenerator);

// Setting endpoints set up for top n queries, including enabling top n queries, window size, and top n size
// Expected metricTypes are Latency, CPU, and Memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -36,6 +38,7 @@
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory;
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
Expand Down Expand Up @@ -100,9 +103,14 @@ public class QueryInsightsService extends AbstractLifecycleComponent {

private volatile boolean searchQueryMetricsEnabled;

private SearchQueryCategorizer searchQueryCategorizer;
private final SearchQueryCategorizer searchQueryCategorizer;

private NamedXContentRegistry namedXContentRegistry;
private final NamedXContentRegistry namedXContentRegistry;

/**
* Query shape generator instance
*/
private QueryShapeGenerator queryShapeGenerator;

/**
* Constructor of the QueryInsightsService
Expand Down Expand Up @@ -496,10 +504,14 @@ public QueryInsightsHealthStats getHealthStats() {
Map<MetricType, TopQueriesHealthStats> topQueriesHealthStatsMap = topQueriesServices.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getHealthStats()));
Map<String, Long> fieldTypeCacheStats = Optional.ofNullable(queryShapeGenerator)
.map(QueryShapeGenerator::getFieldTypeCacheStats)
.orElse(Collections.emptyMap());
return new QueryInsightsHealthStats(
threadPool.info(QUERY_INSIGHTS_EXECUTOR),
this.queryRecordsQueue.size(),
topQueriesHealthStatsMap
topQueriesHealthStatsMap,
fieldTypeCacheStats
);
}

Expand All @@ -511,4 +523,11 @@ private void deleteExpiredTopNIndices() {
topQueriesServices.get(metricType).deleteExpiredTopNIndices(clusterService.state().metadata().indices());
}
}

/**
* Set query shape generator
*/
public void setQueryShapeGenerator(final QueryShapeGenerator queryShapeGenerator) {
this.queryShapeGenerator = queryShapeGenerator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ public class IndicesFieldTypeCache {

private static final Logger logger = LogManager.getLogger(IndicesFieldTypeCache.class);
private final Cache<Index, IndexFieldMap> cache;
/**
* Count of cache evictions
*/
private final CounterMetric evictionCount;
/**
* Count of items in cache
*/
private final CounterMetric entryCount;
/**
* Weight of cache in bytes
*/
private final CounterMetric weight;

public IndicesFieldTypeCache(Settings settings) {
final long sizeInBytes = QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY.get(settings).getBytes();
Expand All @@ -36,9 +48,12 @@ public IndicesFieldTypeCache(Settings settings) {
cacheBuilder.setMaximumWeight(sizeInBytes).weigher((k, v) -> RamUsageEstimator.sizeOfObject(k) + v.weight());
}
cache = cacheBuilder.build();
evictionCount = new CounterMetric();
entryCount = new CounterMetric();
weight = new CounterMetric();
}

public IndexFieldMap getOrInitialize(Index index) {
IndexFieldMap getOrInitialize(Index index) {
try {
return cache.computeIfAbsent(index, k -> new IndexFieldMap());
} catch (ExecutionException ex) {
Expand All @@ -52,16 +67,50 @@ public IndexFieldMap getOrInitialize(Index index) {
}

public void invalidate(Index index) {
IndexFieldMap indexFieldMap = cache.get(index);
evictionCount.inc(indexFieldMap.fieldTypeMap.size());
entryCount.dec(indexFieldMap.fieldTypeMap.size());
weight.dec(indexFieldMap.weight());
cache.invalidate(index);
}

public Iterable<Index> keySet() {
return cache.keys();
}

public void incrementCountAndWeight(String key, String value) {
entryCount.inc();
weight.inc(RamUsageEstimator.sizeOf(key) + RamUsageEstimator.sizeOf(value));
}

/**
* Get eviction count
*/
public Long getEvictionCount() {
return evictionCount.count();
}

/**
* Get entry count
*/
public Long getEntryCount() {
return entryCount.count();
}

/**
* Get cache weight in bytes
*/
public Long getWeight() {
return weight.count();
}

static class IndexFieldMap {
private ConcurrentHashMap<String, String> fieldTypeMap;
private CounterMetric weight;
private final ConcurrentHashMap<String, String> fieldTypeMap;

/**
* Estimated memory consumption of fieldTypeMap in bytes
*/
private final CounterMetric weight;

IndexFieldMap() {
fieldTypeMap = new ConcurrentHashMap<>();
Expand All @@ -72,11 +121,18 @@ public String get(String fieldName) {
return fieldTypeMap.get(fieldName);
}

public void putIfAbsent(String key, String value) {
/**
* Stores key, value if absent
*
* @return {@code true} if key was absent, else {@code false}
*/
public boolean putIfAbsent(String key, String value) {
// Increment the weight only if the key value pair added to the Map
if (fieldTypeMap.putIfAbsent(key, value) == null) {
weight.inc(RamUsageEstimator.sizeOf(key) + RamUsageEstimator.sizeOf(value));
return true;
}
return false;
}

public long weight() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,23 @@ public class QueryShapeGenerator implements ClusterStateListener {
static final String EMPTY_STRING = "";
static final String ONE_SPACE_INDENT = " ";
private final ClusterService clusterService;
private final String NO_FIELD_TYPE_VALUE = "";
private final IndicesFieldTypeCache indicesFieldTypeCache;
private long cacheHitCount;
private long cacheMissCount;

private final String NO_FIELD_TYPE_VALUE = "";
public static final String HIT_COUNT = "hit_count";
public static final String MISS_COUNT = "miss_count";
public static final String EVICTIONS = "evictions";
public static final String ENTRY_COUNT = "entry_count";
public static final String SIZE_IN_BYTES = "size_in_bytes";

public QueryShapeGenerator(ClusterService clusterService) {
this.clusterService = clusterService;
clusterService.addListener(this);
this.indicesFieldTypeCache = new IndicesFieldTypeCache(clusterService.getSettings());
this.cacheHitCount = 0;
this.cacheMissCount = 0;
}

public void clusterChanged(ClusterChangedEvent event) {
Expand Down Expand Up @@ -369,14 +379,20 @@ String getFieldType(String fieldName, Map<String, Object> propertiesAsMap, Index
String fieldType = getFieldTypeFromCache(fieldName, index);

if (fieldType != null) {
cacheHitCount += 1;
return fieldType;
} else {
cacheMissCount += 1;
}

// Retrieve field type from mapping and cache it if found
fieldType = getFieldTypeFromProperties(fieldName, propertiesAsMap);

// Cache field type or NO_FIELD_TYPE_VALUE if not found
indicesFieldTypeCache.getOrInitialize(index).putIfAbsent(fieldName, fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE);
fieldType = fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE;
if (indicesFieldTypeCache.getOrInitialize(index).putIfAbsent(fieldName, fieldType)) {
indicesFieldTypeCache.incrementCountAndWeight(fieldName, fieldType);
}

return fieldType;
}
Expand Down Expand Up @@ -420,4 +436,24 @@ else if (currentMap.containsKey("type")) {
String getFieldTypeFromCache(String fieldName, Index index) {
return indicesFieldTypeCache.getOrInitialize(index).get(fieldName);
}

/**
* Get field type cache stats
*
* @return Map containing cache hit count, miss count, and byte stats
*/
public Map<String, Long> getFieldTypeCacheStats() {
return Map.of(
SIZE_IN_BYTES,
indicesFieldTypeCache.getWeight(),
ENTRY_COUNT,
indicesFieldTypeCache.getEntryCount(),
EVICTIONS,
indicesFieldTypeCache.getEvictionCount(),
HIT_COUNT,
cacheHitCount,
MISS_COUNT,
cacheMissCount
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@

package org.opensearch.plugin.insights.rules.model.healthStats;

import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.ENTRY_COUNT;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.EVICTIONS;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HIT_COUNT;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.MISS_COUNT;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.SIZE_IN_BYTES;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -26,10 +35,12 @@ public class QueryInsightsHealthStats implements ToXContentFragment, Writeable {
private final ThreadPool.Info threadPoolInfo;
private final int queryRecordsQueueSize;
private final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats;
private Map<String, Long> fieldTypeCacheStats;

private static final String THREAD_POOL_INFO = "ThreadPoolInfo";
private static final String QUERY_RECORDS_QUEUE_SIZE = "QueryRecordsQueueSize";
private static final String TOP_QUERIES_HEALTH_STATS = "TopQueriesHealthStats";
private static final String FIELD_TYPE_CACHE_STATS = "FieldTypeCacheStats";

/**
* Constructor to read QueryInsightsHealthStats from a StreamInput.
Expand All @@ -41,6 +52,9 @@ public QueryInsightsHealthStats(final StreamInput in) throws IOException {
this.threadPoolInfo = new ThreadPool.Info(in);
this.queryRecordsQueueSize = in.readInt();
this.topQueriesHealthStats = in.readMap(MetricType::readFromStream, TopQueriesHealthStats::new);
if (in.getVersion().onOrAfter(Version.V_2_19_0)) {
this.fieldTypeCacheStats = in.readMap(StreamInput::readString, StreamInput::readLong);
}
}

/**
Expand All @@ -53,14 +67,16 @@ public QueryInsightsHealthStats(final StreamInput in) throws IOException {
public QueryInsightsHealthStats(
final ThreadPool.Info threadPoolInfo,
final int queryRecordsQueueSize,
final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats
final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats,
final Map<String, Long> fieldTypeCacheStats
) {
if (threadPoolInfo == null || topQueriesHealthStats == null) {
throw new IllegalArgumentException("Parameters cannot be null");
}
this.threadPoolInfo = threadPoolInfo;
this.queryRecordsQueueSize = queryRecordsQueueSize;
this.topQueriesHealthStats = topQueriesHealthStats;
this.fieldTypeCacheStats = Objects.requireNonNull(fieldTypeCacheStats, "fieldTypeCacheStats cannot be null");
}

/**
Expand All @@ -87,6 +103,12 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.endObject();
}
builder.endObject();
// Write field type cache stats
builder.startObject(FIELD_TYPE_CACHE_STATS);
for (String key : List.of(SIZE_IN_BYTES, ENTRY_COUNT, EVICTIONS, HIT_COUNT, MISS_COUNT)) {
builder.field(key, fieldTypeCacheStats.getOrDefault(key, 0L));
}
builder.endObject();
return builder;
}

Expand All @@ -105,6 +127,9 @@ public void writeTo(final StreamOutput out) throws IOException {
MetricType::writeTo,
(streamOutput, topQueriesHealthStats) -> topQueriesHealthStats.writeTo(out)
);
if (out.getVersion().onOrAfter(Version.V_2_19_0)) {
out.writeMap(fieldTypeCacheStats, StreamOutput::writeString, StreamOutput::writeLong);
}
}

/**
Expand Down Expand Up @@ -133,4 +158,13 @@ public int getQueryRecordsQueueSize() {
public Map<MetricType, TopQueriesHealthStats> getTopQueriesHealthStats() {
return topQueriesHealthStats;
}

/**
* Get the field type cache stats.
*
* @return the field type cache stats
*/
public Map<String, Long> getFieldTypeCacheStats() {
return fieldTypeCacheStats;
}
}
Loading

0 comments on commit 36baab6

Please sign in to comment.