Skip to content

Commit

Permalink
Add field type cache stats
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Jan 22, 2025
1 parent 1d991b8 commit c2942ad
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public QueryInsightsListener(
this.clusterService = clusterService;
this.queryInsightsService = queryInsightsService;
this.queryShapeGenerator = new QueryShapeGenerator(clusterService);
queryInsightsService.setQueryShapeGenerator(queryShapeGenerator);

// Setting endpoints set up for top n queries, including enabling top n queries, window size, and top n size
// Expected metricTypes are Latency, CPU, and Memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -36,6 +38,7 @@
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory;
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
Expand Down Expand Up @@ -100,9 +103,14 @@ public class QueryInsightsService extends AbstractLifecycleComponent {

private volatile boolean searchQueryMetricsEnabled;

private SearchQueryCategorizer searchQueryCategorizer;
private final SearchQueryCategorizer searchQueryCategorizer;

private NamedXContentRegistry namedXContentRegistry;
private final NamedXContentRegistry namedXContentRegistry;

/**
* Query shape generator instance
*/
private QueryShapeGenerator queryShapeGenerator;

/**
* Constructor of the QueryInsightsService
Expand Down Expand Up @@ -496,10 +504,14 @@ public QueryInsightsHealthStats getHealthStats() {
Map<MetricType, TopQueriesHealthStats> topQueriesHealthStatsMap = topQueriesServices.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getHealthStats()));
Map<String, Long> fieldTypeCacheStats = Optional.ofNullable(queryShapeGenerator)
.map(QueryShapeGenerator::getFieldTypeCacheStats)
.orElse(Collections.emptyMap());
return new QueryInsightsHealthStats(
threadPool.info(QUERY_INSIGHTS_EXECUTOR),
this.queryRecordsQueue.size(),
topQueriesHealthStatsMap
topQueriesHealthStatsMap,
fieldTypeCacheStats
);
}

Expand All @@ -511,4 +523,11 @@ private void deleteExpiredTopNIndices() {
topQueriesServices.get(metricType).deleteExpiredTopNIndices(clusterService.state().metadata().indices());
}
}

/**
* Set query shape generator
*/
public void setQueryShapeGenerator(final QueryShapeGenerator queryShapeGenerator) {
this.queryShapeGenerator = queryShapeGenerator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ public class IndicesFieldTypeCache {

private static final Logger logger = LogManager.getLogger(IndicesFieldTypeCache.class);
private final Cache<Index, IndexFieldMap> cache;
/**
* Count of items in the cache
*/
private final CounterMetric count;
/**
* Weight of cache in bytes
*/
private final CounterMetric weight;

public IndicesFieldTypeCache(Settings settings) {
final long sizeInBytes = QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY.get(settings).getBytes();
Expand All @@ -36,9 +44,11 @@ public IndicesFieldTypeCache(Settings settings) {
cacheBuilder.setMaximumWeight(sizeInBytes).weigher((k, v) -> RamUsageEstimator.sizeOfObject(k) + v.weight());
}
cache = cacheBuilder.build();
count = new CounterMetric();
weight = new CounterMetric();
}

public IndexFieldMap getOrInitialize(Index index) {
IndexFieldMap getOrInitialize(Index index) {
try {
return cache.computeIfAbsent(index, k -> new IndexFieldMap());
} catch (ExecutionException ex) {
Expand All @@ -52,16 +62,41 @@ public IndexFieldMap getOrInitialize(Index index) {
}

public void invalidate(Index index) {
count.dec(cache.get(index).fieldTypeMap.size());
weight.dec(cache.get(index).weight());
cache.invalidate(index);
}

public Iterable<Index> keySet() {
return cache.keys();
}

public void incrementCountAndWeight(String key, String value) {
count.inc();
weight.inc(RamUsageEstimator.sizeOf(key) + RamUsageEstimator.sizeOf(value));
}

/**
* Get cache weight in bytes
*/
public Long getCount() {
return count.count();
}

/**
* Get cache weight in bytes
*/
public Long getWeight() {
return weight.count();
}

static class IndexFieldMap {
private ConcurrentHashMap<String, String> fieldTypeMap;
private CounterMetric weight;
private final ConcurrentHashMap<String, String> fieldTypeMap;

/**
* Estimated memory consumption of fieldTypeMap in bytes
*/
private final CounterMetric weight;

IndexFieldMap() {
fieldTypeMap = new ConcurrentHashMap<>();
Expand All @@ -72,11 +107,18 @@ public String get(String fieldName) {
return fieldTypeMap.get(fieldName);
}

public void putIfAbsent(String key, String value) {
/**
* Stores key, value if absent
*
* @return {@code true} if key was absent, else {@code false}
*/
public boolean putIfAbsent(String key, String value) {
// Increment the weight only if the key value pair added to the Map
if (fieldTypeMap.putIfAbsent(key, value) == null) {
weight.inc(RamUsageEstimator.sizeOf(key) + RamUsageEstimator.sizeOf(value));
return true;
}
return false;
}

public long weight() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,22 @@ public class QueryShapeGenerator implements ClusterStateListener {
static final String EMPTY_STRING = "";
static final String ONE_SPACE_INDENT = " ";
private final ClusterService clusterService;
private final String NO_FIELD_TYPE_VALUE = "";
private final IndicesFieldTypeCache indicesFieldTypeCache;
private long cacheHitCount;
private long cacheMissCount;

private final String NO_FIELD_TYPE_VALUE = "";
public static final String HITS_STRING = "hits";
public static final String MISSES_STRING = "misses";
public static final String COUNT_STRING = "count";
public static final String BYTES_STRING = "bytes";

public QueryShapeGenerator(ClusterService clusterService) {
this.clusterService = clusterService;
clusterService.addListener(this);
this.indicesFieldTypeCache = new IndicesFieldTypeCache(clusterService.getSettings());
this.cacheHitCount = 0;
this.cacheMissCount = 0;
}

public void clusterChanged(ClusterChangedEvent event) {
Expand Down Expand Up @@ -369,14 +378,20 @@ String getFieldType(String fieldName, Map<String, Object> propertiesAsMap, Index
String fieldType = getFieldTypeFromCache(fieldName, index);

if (fieldType != null) {
cacheHitCount += 1;
return fieldType;
} else {
cacheMissCount += 1;
}

// Retrieve field type from mapping and cache it if found
fieldType = getFieldTypeFromProperties(fieldName, propertiesAsMap);

// Cache field type or NO_FIELD_TYPE_VALUE if not found
indicesFieldTypeCache.getOrInitialize(index).putIfAbsent(fieldName, fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE);
fieldType = fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE;
if (indicesFieldTypeCache.getOrInitialize(index).putIfAbsent(fieldName, fieldType)) {
indicesFieldTypeCache.incrementCountAndWeight(fieldName, fieldType);
}

return fieldType;
}
Expand Down Expand Up @@ -420,4 +435,22 @@ else if (currentMap.containsKey("type")) {
String getFieldTypeFromCache(String fieldName, Index index) {
return indicesFieldTypeCache.getOrInitialize(index).get(fieldName);
}

/**
* Get field type cache stats
*
* @return Map containing cache hit count, miss count, and byte stats
*/
public Map<String, Long> getFieldTypeCacheStats() {
return Map.of(
HITS_STRING,
cacheHitCount,
MISSES_STRING,
cacheMissCount,
COUNT_STRING,
indicesFieldTypeCache.getCount(),
BYTES_STRING,
indicesFieldTypeCache.getWeight()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,16 @@

package org.opensearch.plugin.insights.rules.model.healthStats;

import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.BYTES_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.COUNT_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HITS_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.MISSES_STRING;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -26,10 +34,12 @@ public class QueryInsightsHealthStats implements ToXContentFragment, Writeable {
private final ThreadPool.Info threadPoolInfo;
private final int queryRecordsQueueSize;
private final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats;
private Map<String, Long> fieldTypeCacheStats;

private static final String THREAD_POOL_INFO = "ThreadPoolInfo";
private static final String QUERY_RECORDS_QUEUE_SIZE = "QueryRecordsQueueSize";
private static final String TOP_QUERIES_HEALTH_STATS = "TopQueriesHealthStats";
private static final String FIELD_TYPE_CACHE_STATS = "FieldTypeCacheStats";

/**
* Constructor to read QueryInsightsHealthStats from a StreamInput.
Expand All @@ -41,6 +51,9 @@ public QueryInsightsHealthStats(final StreamInput in) throws IOException {
this.threadPoolInfo = new ThreadPool.Info(in);
this.queryRecordsQueueSize = in.readInt();
this.topQueriesHealthStats = in.readMap(MetricType::readFromStream, TopQueriesHealthStats::new);
if (in.getVersion().onOrAfter(Version.V_2_19_0)) {
this.fieldTypeCacheStats = in.readMap(StreamInput::readString, StreamInput::readLong);
}
}

/**
Expand All @@ -53,14 +66,16 @@ public QueryInsightsHealthStats(final StreamInput in) throws IOException {
public QueryInsightsHealthStats(
final ThreadPool.Info threadPoolInfo,
final int queryRecordsQueueSize,
final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats
final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats,
final Map<String, Long> fieldTypeCacheStats
) {
if (threadPoolInfo == null || topQueriesHealthStats == null) {
throw new IllegalArgumentException("Parameters cannot be null");
}
this.threadPoolInfo = threadPoolInfo;
this.queryRecordsQueueSize = queryRecordsQueueSize;
this.topQueriesHealthStats = topQueriesHealthStats;
this.fieldTypeCacheStats = Objects.requireNonNull(fieldTypeCacheStats, "fieldTypeCacheStats cannot be null");
}

/**
Expand All @@ -87,6 +102,12 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.endObject();
}
builder.endObject();
// Write field type cache stats
builder.startObject(FIELD_TYPE_CACHE_STATS);
for (String key : List.of(HITS_STRING, MISSES_STRING, COUNT_STRING, BYTES_STRING)) {
builder.field(key, fieldTypeCacheStats.getOrDefault(key, 0L));
}
builder.endObject();
return builder;
}

Expand All @@ -105,6 +126,9 @@ public void writeTo(final StreamOutput out) throws IOException {
MetricType::writeTo,
(streamOutput, topQueriesHealthStats) -> topQueriesHealthStats.writeTo(out)
);
if (out.getVersion().onOrAfter(Version.V_2_19_0)) {
out.writeMap(fieldTypeCacheStats, StreamOutput::writeString, StreamOutput::writeLong);
}
}

/**
Expand Down Expand Up @@ -133,4 +157,13 @@ public int getQueryRecordsQueueSize() {
public Map<MetricType, TopQueriesHealthStats> getTopQueriesHealthStats() {
return topQueriesHealthStats;
}

/**
* Get the field type cache stats.
*
* @return the field type cache stats
*/
public Map<String, Long> getFieldTypeCacheStats() {
return fieldTypeCacheStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.BYTES_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.COUNT_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HITS_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.MISSES_STRING;

import java.util.List;
import java.util.Map;
Expand All @@ -25,6 +29,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
Expand Down Expand Up @@ -59,8 +64,9 @@ public void setup() {
"QueryInsightsHealthStatsTests",
new ScalingExecutorBuilder(QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, 1, 5, TimeValue.timeValueMinutes(5))
);
ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool);
queryInsightsService = new QueryInsightsService(
new ClusterService(settings, clusterSettings, threadPool),
clusterService,
threadPool,
client,
NoopMetricsRegistry.INSTANCE,
Expand All @@ -69,6 +75,7 @@ public void setup() {
queryInsightsService.enableCollection(MetricType.LATENCY, true);
queryInsightsService.enableCollection(MetricType.CPU, true);
queryInsightsService.enableCollection(MetricType.MEMORY, true);
queryInsightsService.setQueryShapeGenerator(new QueryShapeGenerator(clusterService));
queryInsightsServiceSpy = spy(queryInsightsService);

MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
Expand Down Expand Up @@ -207,5 +214,12 @@ public void testGetHealthStats() {
assertTrue(topQueriesHealthStatsMap.containsKey(MetricType.LATENCY));
assertTrue(topQueriesHealthStatsMap.containsKey(MetricType.CPU));
assertTrue(topQueriesHealthStatsMap.containsKey(MetricType.MEMORY));
Map<String, Long> fieldTypeCacheStats = healthStats.getFieldTypeCacheStats();
assertNotNull(fieldTypeCacheStats);
assertEquals(4, fieldTypeCacheStats.size());
assertTrue(fieldTypeCacheStats.containsKey(HITS_STRING));
assertTrue(fieldTypeCacheStats.containsKey(MISSES_STRING));
assertTrue(fieldTypeCacheStats.containsKey(COUNT_STRING));
assertTrue(fieldTypeCacheStats.containsKey(BYTES_STRING));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void setup() {
this.healthStats = new QueryInsightsHealthStats(
threadPool.info(QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR),
10,
new HashMap<>(),
new HashMap<>()
);
}
Expand Down
Loading

0 comments on commit c2942ad

Please sign in to comment.