Skip to content

Commit

Permalink
Add fetch top queries by id API (#195) (#201)
Browse files Browse the repository at this point in the history
(cherry picked from commit 210ce5d)

Signed-off-by: Siddhant Deshmukh <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 59360b1 commit b163c36
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.MatchQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
Expand Down Expand Up @@ -84,11 +84,12 @@ public LocalIndexReader setIndexPattern(DateTimeFormatter indexPattern) {
* Export a list of SearchQueryRecord from local index
*
* @param from start timestamp
* @param to end timestamp
* @param to end timestamp
* @param id query/group id
* @return list of SearchQueryRecords whose timestamps fall between from and to
*/
@Override
public List<SearchQueryRecord> read(final String from, final String to) {
public List<SearchQueryRecord> read(final String from, final String to, String id) {
List<SearchQueryRecord> records = new ArrayList<>();
if (from == null || to == null) {
return records;
Expand All @@ -108,7 +109,11 @@ public List<SearchQueryRecord> read(final String from, final String to) {
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp")
.from(start.toInstant().toEpochMilli())
.to(end.toInstant().toEpochMilli());
QueryBuilder query = QueryBuilders.boolQuery().must(rangeQuery).mustNot(excludeQuery);
BoolQueryBuilder query = QueryBuilders.boolQuery().must(rangeQuery).mustNot(excludeQuery);

if (id != null) {
query.must(QueryBuilders.matchQuery("id", id));
}
searchSourceBuilder.query(query);
searchRequest.source(searchSourceBuilder);
try {
Expand All @@ -124,7 +129,6 @@ public List<SearchQueryRecord> read(final String from, final String to) {
logger.error("Unable to parse search hit: ", e);
}
curr = curr.plusDays(1);

}
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ public interface QueryInsightsReader extends Closeable {
* Reader a list of SearchQueryRecord
*
* @param from string
* @param to string
* @param to string
* @param id query/group id
* @return List of SearchQueryRecord
*/
List<SearchQueryRecord> read(final String from, final String to);
List<SearchQueryRecord> read(final String from, final String to, final String id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,16 @@ public void validateExporterAndReaderConfig(Settings settings) {
* @param includeLastWindow if the top N queries from the last window should be included
* @param from start timestamp
* @param to end timestamp
* @param id unique identifier for query/query group
* @return List of the records that are in the query insight store
* @throws IllegalArgumentException if query insights is disabled in the cluster
*/
public List<SearchQueryRecord> getTopQueriesRecords(final boolean includeLastWindow, final String from, final String to)
throws IllegalArgumentException {
public List<SearchQueryRecord> getTopQueriesRecords(
final boolean includeLastWindow,
final String from,
final String to,
final String id
) throws IllegalArgumentException {
OperationalMetricsCounter.getInstance()
.incrementCounter(
OperationalMetric.TOP_N_QUERIES_USAGE_COUNT,
Expand All @@ -380,13 +385,21 @@ public List<SearchQueryRecord> getTopQueriesRecords(final boolean includeLastWin
queries.addAll(topQueriesHistorySnapshot.get());
}
List<SearchQueryRecord> filterQueries = queries;

// Time-based filtering
if (from != null && to != null) {
final ZonedDateTime start = ZonedDateTime.parse(from);
final ZonedDateTime end = ZonedDateTime.parse(to);
Predicate<SearchQueryRecord> timeFilter = element -> start.toInstant().toEpochMilli() <= element.getTimestamp()
&& element.getTimestamp() <= end.toInstant().toEpochMilli();
filterQueries = queries.stream().filter(checkIfInternal.and(timeFilter)).collect(Collectors.toList());
}

// Filter based on the id, if provided
if (id != null) {
filterQueries = filterQueries.stream().filter(record -> record.getId().equals(id)).collect(Collectors.toList());
}

return Stream.of(filterQueries)
.flatMap(Collection::stream)
.sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1)
Expand All @@ -399,11 +412,13 @@ public List<SearchQueryRecord> getTopQueriesRecords(final boolean includeLastWin
* By default, return the records in sorted order.
*
* @param from start timestamp
* @param to end timestamp
* @param to end timestamp
* @param id search query record id
* @return List of the records that are in local index (if enabled) with timestamps between from and to
* @throws IllegalArgumentException if query insights is disabled in the cluster
*/
public List<SearchQueryRecord> getTopQueriesRecordsFromIndex(final String from, final String to) throws IllegalArgumentException {
public List<SearchQueryRecord> getTopQueriesRecordsFromIndex(final String from, final String to, final String id)
throws IllegalArgumentException {
if (!enabled) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Cannot get top n queries for [%s] when it is not enabled.", metricType.toString())
Expand All @@ -415,7 +430,7 @@ public List<SearchQueryRecord> getTopQueriesRecordsFromIndex(final String from,
try {
final ZonedDateTime start = ZonedDateTime.parse(from);
final ZonedDateTime end = ZonedDateTime.parse(to);
List<SearchQueryRecord> records = reader.read(from, to);
List<SearchQueryRecord> records = reader.read(from, to, id);
Predicate<SearchQueryRecord> timeFilter = element -> start.toInstant().toEpochMilli() <= element.getTimestamp()
&& element.getTimestamp() <= end.toInstant().toEpochMilli();
List<SearchQueryRecord> filteredRecords = records.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class TopQueriesRequest extends BaseNodesRequest<TopQueriesRequest> {
final MetricType metricType;
final String from;
final String to;
final String id;

/**
* Constructor for TopQueriesRequest
Expand All @@ -34,6 +35,7 @@ public TopQueriesRequest(final StreamInput in) throws IOException {
this.metricType = MetricType.readFromStream(in);
this.from = null;
this.to = null;
this.id = null;
}

/**
Expand All @@ -45,11 +47,12 @@ public TopQueriesRequest(final StreamInput in) throws IOException {
* @param to end timestamp
* @param nodesIds the nodeIds specified in the request
*/
public TopQueriesRequest(final MetricType metricType, final String from, final String to, final String... nodesIds) {
public TopQueriesRequest(final MetricType metricType, final String from, final String to, final String id, final String... nodesIds) {
super(nodesIds);
this.metricType = metricType;
this.from = from;
this.to = to;
this.id = id;
}

/**
Expand All @@ -76,6 +79,14 @@ public String getTo() {
return to;
}

/**
* Get id which is the query_id and query_group_id
* @return String of to timestamp
*/
public String getId() {
return id;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,15 @@ public long getTimestamp() {
return timestamp;
}

/**
* Returns the id.
*
* @return the id
*/
public String getId() {
return id;
}

/**
* Returns the measurement associated with the specified name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,40 +79,46 @@ static TopQueriesRequest prepareRequest(final RestRequest request) {
final String metricType = request.param("type", MetricType.LATENCY.toString());
final String from = request.param("from", null);
final String to = request.param("to", null);
final String id = request.param("id", null);
if (!ALLOWED_METRICS.contains(metricType)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "request [%s] contains invalid metric type [%s]", request.path(), metricType)
);
}
if (from != null || to != null) {
if (from != null ^ to != null) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "request [%s] is missing one of the time parameters. Both must be provided", request.path())
);
}
if (isNotISODate(from)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"request [%s] contains invalid 'from' date format. Expected ISO8601 format string (YYYY-MM-DD'T'HH:mm:ss.SSSZ): [%s]",
request.path(),
from
)
);
}
if (isNotISODate(to)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"request [%s] contains invalid 'to' date format. Expected ISO8601 format string (YYYY-MM-DD'T'HH:mm:ss.SSSZ): [%s]",
request.path(),
to
)
);
}
boolean isTimeRangeProvided = from != null || to != null;
if (isTimeRangeProvided) {
validateTimeRange(request, from, to);
}

return new TopQueriesRequest(MetricType.fromString(metricType), from, to, nodesIds);
return new TopQueriesRequest(MetricType.fromString(metricType), from, to, id, nodesIds);
}

private static void validateTimeRange(RestRequest request, String from, String to) {
if (from != null ^ to != null) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "request [%s] is missing one of the time parameters. Both must be provided", request.path())
);
}
if (isNotISODate(from)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"request [%s] contains invalid 'from' date format. Expected ISO8601 format string (YYYY-MM-DD'T'HH:mm:ss.SSSZ): [%s]",
request.path(),
from
)
);
}
if (isNotISODate(to)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"request [%s] contains invalid 'to' date format. Expected ISO8601 format string (YYYY-MM-DD'T'HH:mm:ss.SSSZ): [%s]",
request.path(),
to
)
);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ protected TopQueriesResponse newResponse(
}
final String from = topQueriesRequest.getFrom();
final String to = topQueriesRequest.getTo();
final String id = topQueriesRequest.getId();
if (from != null && to != null) {
responses.add(
new TopQueries(
clusterService.localNode(),
queryInsightsService.getTopQueriesService(topQueriesRequest.getMetricType()).getTopQueriesRecordsFromIndex(from, to)
queryInsightsService.getTopQueriesService(topQueriesRequest.getMetricType()).getTopQueriesRecordsFromIndex(from, to, id)
)
);
}
Expand All @@ -114,9 +115,10 @@ protected TopQueries nodeOperation(final NodeRequest nodeRequest) {
final TopQueriesRequest request = nodeRequest.request;
final String from = request.getFrom();
final String to = request.getTo();
final String id = request.getId();
return new TopQueries(
clusterService.localNode(),
queryInsightsService.getTopQueriesService(request.getMetricType()).getTopQueriesRecords(true, from, to)
queryInsightsService.getTopQueriesService(request.getMetricType()).getTopQueriesRecords(true, from, to, id)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
Expand All @@ -54,15 +55,26 @@

final public class QueryInsightsTestUtils {

static String randomId = UUID.randomUUID().toString();

public QueryInsightsTestUtils() {}

/**
* Returns list of randomly generated search query records with a specific id
* @param count number of records
* @return List of records
*/
public static List<SearchQueryRecord> generateQueryInsightRecords(int count, String id) {
return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0, AggregationType.DEFAULT_AGGREGATION_TYPE, id);
}

/**
* Returns list of randomly generated search query records.
* @param count number of records
* @return List of records
*/
public static List<SearchQueryRecord> generateQueryInsightRecords(int count) {
return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0, AggregationType.DEFAULT_AGGREGATION_TYPE);
return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0, AggregationType.DEFAULT_AGGREGATION_TYPE, randomId);
}

/**
Expand All @@ -77,7 +89,8 @@ public static List<SearchQueryRecord> generateQueryInsightRecords(int count, Sea
count,
System.currentTimeMillis(),
0,
AggregationType.DEFAULT_AGGREGATION_TYPE
AggregationType.DEFAULT_AGGREGATION_TYPE,
randomId
);
for (SearchQueryRecord record : records) {
record.getAttributes().put(Attribute.SOURCE, searchSourceBuilder);
Expand All @@ -92,14 +105,14 @@ public static List<SearchQueryRecord> generateQueryInsightRecords(int count, Sea
* @return List of records
*/
public static List<SearchQueryRecord> generateQueryInsightRecords(int count, AggregationType aggregationType) {
return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0, aggregationType);
return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0, aggregationType, randomId);
}

/**
* Creates a List of random Query Insight Records for testing purpose
*/
public static List<SearchQueryRecord> generateQueryInsightRecords(int lower, int upper, long startTimeStamp, long interval) {
return generateQueryInsightRecords(lower, upper, startTimeStamp, interval, AggregationType.NONE);
return generateQueryInsightRecords(lower, upper, startTimeStamp, interval, AggregationType.NONE, randomId);
}

/**
Expand All @@ -110,7 +123,8 @@ public static List<SearchQueryRecord> generateQueryInsightRecords(
int upper,
long startTimeStamp,
long interval,
AggregationType aggregationType
AggregationType aggregationType,
String id
) {
List<SearchQueryRecord> records = new ArrayList<>();
int countOfRecords = randomIntBetween(lower, upper);
Expand Down Expand Up @@ -161,7 +175,7 @@ public static List<SearchQueryRecord> generateQueryInsightRecords(
)
);

records.add(new SearchQueryRecord(timestamp, measurements, attributes));
records.add(new SearchQueryRecord(timestamp, measurements, attributes, id));
timestamp += interval;
}
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ public void testReadRecords() {
when(responseActionFuture.actionGet()).thenReturn(searchResponse);
when(client.search(any(SearchRequest.class))).thenReturn(responseActionFuture);
String time = ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_DATE_TIME);
String id = "example-hashcode";
List<SearchQueryRecord> records = List.of();
try {
records = localIndexReader.read(time, time);
records = localIndexReader.read(time, time, id);
} catch (Exception e) {
fail("No exception should be thrown when reading query insights data");
}
Expand Down
Loading

0 comments on commit b163c36

Please sign in to comment.