Skip to content

Commit

Permalink
Add field type cache stats
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Jan 17, 2025
1 parent 1d991b8 commit 84a46e2
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public QueryInsightsListener(
this.clusterService = clusterService;
this.queryInsightsService = queryInsightsService;
this.queryShapeGenerator = new QueryShapeGenerator(clusterService);
queryInsightsService.setQueryShapeGenerator(queryShapeGenerator);

// Setting endpoints set up for top n queries, including enabling top n queries, window size, and top n size
// Expected metricTypes are Latency, CPU, and Memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.plugin.insights.core.service;

import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.BYTES_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HITS_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.MISSES_STRING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER;
Expand All @@ -19,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -36,6 +40,7 @@
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory;
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
Expand Down Expand Up @@ -100,9 +105,16 @@ public class QueryInsightsService extends AbstractLifecycleComponent {

private volatile boolean searchQueryMetricsEnabled;

private SearchQueryCategorizer searchQueryCategorizer;
private final SearchQueryCategorizer searchQueryCategorizer;

private NamedXContentRegistry namedXContentRegistry;
private final NamedXContentRegistry namedXContentRegistry;

/**
* Query shape generator instance
*/
private QueryShapeGenerator queryShapeGenerator;

private static final Map<String, Long> EMPTY_FIELD_TYPE_CACHE_STATS = Map.of(HITS_STRING, 0L, MISSES_STRING, 0L, BYTES_STRING, 0L);

/**
* Constructor of the QueryInsightsService
Expand Down Expand Up @@ -496,10 +508,14 @@ public QueryInsightsHealthStats getHealthStats() {
Map<MetricType, TopQueriesHealthStats> topQueriesHealthStatsMap = topQueriesServices.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getHealthStats()));
Map<String, Long> fieldTypeCacheStats = Optional.ofNullable(queryShapeGenerator)
.map(QueryShapeGenerator::getFieldTypeCacheStats)
.orElse(EMPTY_FIELD_TYPE_CACHE_STATS);
return new QueryInsightsHealthStats(
threadPool.info(QUERY_INSIGHTS_EXECUTOR),
this.queryRecordsQueue.size(),
topQueriesHealthStatsMap
topQueriesHealthStatsMap,
fieldTypeCacheStats
);
}

Expand All @@ -511,4 +527,11 @@ private void deleteExpiredTopNIndices() {
topQueriesServices.get(metricType).deleteExpiredTopNIndices(clusterService.state().metadata().indices());
}
}

/**
* Set query shape generator
*/
public void setQueryShapeGenerator(final QueryShapeGenerator queryShapeGenerator) {
this.queryShapeGenerator = queryShapeGenerator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public IndicesFieldTypeCache(Settings settings) {
cache = cacheBuilder.build();
}

public IndexFieldMap getOrInitialize(Index index) {
IndexFieldMap getOrInitialize(Index index) {
try {
return cache.computeIfAbsent(index, k -> new IndexFieldMap());
} catch (ExecutionException ex) {
Expand All @@ -59,9 +59,33 @@ public Iterable<Index> keySet() {
return cache.keys();
}

/**
* Estimated memory consumption of the cache in bytes
*/
public Long getEstimatedSize() {
long totalWeight = 0;

// Iterate over the keys of the cache
for (Index index : cache.keys()) {
// Get the corresponding IndexFieldMap
IndexFieldMap indexFieldMap = cache.get(index);

// Ensure the value is not null before calling weight()
if (indexFieldMap != null) {
totalWeight += indexFieldMap.weight();
}
}

return totalWeight;
}

static class IndexFieldMap {
private ConcurrentHashMap<String, String> fieldTypeMap;
private CounterMetric weight;
private final ConcurrentHashMap<String, String> fieldTypeMap;

/**
* Estimated memory consumption of fieldTypeMap in bytes
*/
private final CounterMetric weight;

IndexFieldMap() {
fieldTypeMap = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,21 @@ public class QueryShapeGenerator implements ClusterStateListener {
static final String EMPTY_STRING = "";
static final String ONE_SPACE_INDENT = " ";
private final ClusterService clusterService;
private final String NO_FIELD_TYPE_VALUE = "";
private final IndicesFieldTypeCache indicesFieldTypeCache;
private long cacheHitCount;
private long cacheMissCount;

private final String NO_FIELD_TYPE_VALUE = "";
public static final String HITS_STRING = "hits";
public static final String MISSES_STRING = "misses";
public static final String BYTES_STRING = "bytes";

public QueryShapeGenerator(ClusterService clusterService) {
this.clusterService = clusterService;
clusterService.addListener(this);
this.indicesFieldTypeCache = new IndicesFieldTypeCache(clusterService.getSettings());
this.cacheHitCount = 0;
this.cacheMissCount = 0;
}

public void clusterChanged(ClusterChangedEvent event) {
Expand Down Expand Up @@ -369,7 +377,10 @@ String getFieldType(String fieldName, Map<String, Object> propertiesAsMap, Index
String fieldType = getFieldTypeFromCache(fieldName, index);

if (fieldType != null) {
cacheHitCount += 1;
return fieldType;
} else {
cacheMissCount += 1;
}

// Retrieve field type from mapping and cache it if found
Expand Down Expand Up @@ -420,4 +431,13 @@ else if (currentMap.containsKey("type")) {
String getFieldTypeFromCache(String fieldName, Index index) {
return indicesFieldTypeCache.getOrInitialize(index).get(fieldName);
}

/**
* Get field type cache stats
*
* @return Map<String, Long> of indices field type cache hits, misses, & bytes
*/
public Map<String, Long> getFieldTypeCacheStats() {
return Map.of(HITS_STRING, cacheHitCount, MISSES_STRING, cacheMissCount, BYTES_STRING, indicesFieldTypeCache.getEstimatedSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package org.opensearch.plugin.insights.rules.model.healthStats;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -26,10 +28,12 @@ public class QueryInsightsHealthStats implements ToXContentFragment, Writeable {
private final ThreadPool.Info threadPoolInfo;
private final int queryRecordsQueueSize;
private final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats;
private Map<String, Long> fieldTypeCacheStats;

private static final String THREAD_POOL_INFO = "ThreadPoolInfo";
private static final String QUERY_RECORDS_QUEUE_SIZE = "QueryRecordsQueueSize";
private static final String TOP_QUERIES_HEALTH_STATS = "TopQueriesHealthStats";
private static final String FIELD_TYPE_CACHE_STATS = "FieldTypeCacheStats";

/**
* Constructor to read QueryInsightsHealthStats from a StreamInput.
Expand All @@ -41,6 +45,9 @@ public QueryInsightsHealthStats(final StreamInput in) throws IOException {
this.threadPoolInfo = new ThreadPool.Info(in);
this.queryRecordsQueueSize = in.readInt();
this.topQueriesHealthStats = in.readMap(MetricType::readFromStream, TopQueriesHealthStats::new);
if (in.getVersion().onOrAfter(Version.V_2_19_0)) {
this.fieldTypeCacheStats = in.readMap(StreamInput::readString, StreamInput::readLong);
}
}

/**
Expand All @@ -53,14 +60,16 @@ public QueryInsightsHealthStats(final StreamInput in) throws IOException {
public QueryInsightsHealthStats(
final ThreadPool.Info threadPoolInfo,
final int queryRecordsQueueSize,
final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats
final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats,
final Map<String, Long> fieldTypeCacheStats
) {
if (threadPoolInfo == null || topQueriesHealthStats == null) {
throw new IllegalArgumentException("Parameters cannot be null");
}
this.threadPoolInfo = threadPoolInfo;
this.queryRecordsQueueSize = queryRecordsQueueSize;
this.topQueriesHealthStats = topQueriesHealthStats;
this.fieldTypeCacheStats = fieldTypeCacheStats;
}

/**
Expand All @@ -87,6 +96,12 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.endObject();
}
builder.endObject();
// Write field type cache stats
builder.startObject(FIELD_TYPE_CACHE_STATS);
for (String key : List.of("hits", "misses", "bytes")) {
builder.field(key, fieldTypeCacheStats.getOrDefault(key, 0L));
}
builder.endObject();
return builder;
}

Expand All @@ -105,6 +120,9 @@ public void writeTo(final StreamOutput out) throws IOException {
MetricType::writeTo,
(streamOutput, topQueriesHealthStats) -> topQueriesHealthStats.writeTo(out)
);
if (out.getVersion().onOrAfter(Version.V_2_19_0)) {
out.writeMap(fieldTypeCacheStats, StreamOutput::writeString, StreamOutput::writeLong);
}
}

/**
Expand Down Expand Up @@ -133,4 +151,13 @@ public int getQueryRecordsQueueSize() {
public Map<MetricType, TopQueriesHealthStats> getTopQueriesHealthStats() {
return topQueriesHealthStats;
}

/**
* Get the field type cache stats.
*
* @return the field type cache stats
*/
public Map<String, Long> getFieldTypeCacheStats() {
return fieldTypeCacheStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.BYTES_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HITS_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.MISSES_STRING;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -207,5 +210,11 @@ public void testGetHealthStats() {
assertTrue(topQueriesHealthStatsMap.containsKey(MetricType.LATENCY));
assertTrue(topQueriesHealthStatsMap.containsKey(MetricType.CPU));
assertTrue(topQueriesHealthStatsMap.containsKey(MetricType.MEMORY));
Map<String, Long> fieldTypeCacheStats = healthStats.getFieldTypeCacheStats();
assertNotNull(fieldTypeCacheStats);
assertEquals(3, fieldTypeCacheStats.size());
assertTrue(fieldTypeCacheStats.containsKey(HITS_STRING));
assertTrue(fieldTypeCacheStats.containsKey(MISSES_STRING));
assertTrue(fieldTypeCacheStats.containsKey(BYTES_STRING));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void setup() {
this.healthStats = new QueryInsightsHealthStats(
threadPool.info(QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR),
10,
new HashMap<>(),
new HashMap<>()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void setup() {
this.healthStats = new QueryInsightsHealthStats(
threadPool.info(QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR),
10,
new HashMap<>(),
new HashMap<>()
);
}
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testToXContent() throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
String expectedJson =
"{\"node_for_health_stats_test\":{\"ThreadPoolInfo\":{\"query_insights_executor\":{\"type\":\"scaling\",\"core\":1,\"max\":5,\"keep_alive\":\"5m\",\"queue_size\":-1}},\"QueryRecordsQueueSize\":10,\"TopQueriesHealthStats\":{}}}";
"{\"node_for_health_stats_test\":{\"ThreadPoolInfo\":{\"query_insights_executor\":{\"type\":\"scaling\",\"core\":1,\"max\":5,\"keep_alive\":\"5m\",\"queue_size\":-1}},\"QueryRecordsQueueSize\":10,\"TopQueriesHealthStats\":{},\"FieldTypeCacheStats\":{\"hits\":0,\"misses\":0,\"bytes\":0}}}";
assertEquals(expectedJson, builder.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.opensearch.plugin.insights.rules.model.healthStats;

import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.BYTES_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HITS_STRING;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.MISSES_STRING;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -34,6 +38,7 @@ public class QueryInsightsHealthStatsTests extends OpenSearchTestCase {
private ThreadPool.Info threadPoolInfo;
private int queryRecordsQueueSize;
private Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats;
private Map<String, Long> fieldTypeCacheStats;

@Before
public void setUpQueryInsightsHealthStats() {
Expand All @@ -45,6 +50,7 @@ public void setUpQueryInsightsHealthStats() {
queryRecordsQueueSize = 100;
topQueriesHealthStats = new HashMap<>();
topQueriesHealthStats.put(MetricType.LATENCY, new TopQueriesHealthStats(10, new QueryGrouperHealthStats(20, 15)));
fieldTypeCacheStats = Map.of(HITS_STRING, 5L, MISSES_STRING, 3L, BYTES_STRING, 300L);
}

@Override
Expand All @@ -54,15 +60,25 @@ public void tearDown() throws Exception {
}

public void testConstructorAndGetters() {
QueryInsightsHealthStats healthStats = new QueryInsightsHealthStats(threadPoolInfo, queryRecordsQueueSize, topQueriesHealthStats);
QueryInsightsHealthStats healthStats = new QueryInsightsHealthStats(
threadPoolInfo,
queryRecordsQueueSize,
topQueriesHealthStats,
fieldTypeCacheStats
);
assertNotNull(healthStats);
assertEquals(threadPoolInfo, healthStats.getThreadPoolInfo());
assertEquals(queryRecordsQueueSize, healthStats.getQueryRecordsQueueSize());
assertEquals(topQueriesHealthStats, healthStats.getTopQueriesHealthStats());
}

public void testSerialization() throws IOException {
QueryInsightsHealthStats healthStats = new QueryInsightsHealthStats(threadPoolInfo, queryRecordsQueueSize, topQueriesHealthStats);
QueryInsightsHealthStats healthStats = new QueryInsightsHealthStats(
threadPoolInfo,
queryRecordsQueueSize,
topQueriesHealthStats,
fieldTypeCacheStats
);
// Write to StreamOutput
BytesStreamOutput out = new BytesStreamOutput();
healthStats.writeTo(out);
Expand All @@ -75,7 +91,12 @@ public void testSerialization() throws IOException {
}

public void testToXContent() throws IOException {
QueryInsightsHealthStats healthStats = new QueryInsightsHealthStats(threadPoolInfo, queryRecordsQueueSize, topQueriesHealthStats);
QueryInsightsHealthStats healthStats = new QueryInsightsHealthStats(
threadPoolInfo,
queryRecordsQueueSize,
topQueriesHealthStats,
fieldTypeCacheStats
);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();

Expand All @@ -100,6 +121,11 @@ public void testToXContent() throws IOException {
+ " \"QueryGroupCount_Total\": 20,\n"
+ " \"QueryGroupCount_MaxHeap\": 15\n"
+ " }\n"
+ " },\n"
+ " \"FieldTypeCacheStats\": {\n"
+ " \"hits\": 5,\n"
+ " \"misses\": 3,\n"
+ " \"bytes\": 300\n"
+ " }\n"
+ "}";
assertEquals(expectedJson.replaceAll("\\s", ""), jsonOutput.replaceAll("\\s", ""));
Expand Down

0 comments on commit 84a46e2

Please sign in to comment.