Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17141: Create CpuQueryLimit implementation #2244

Merged
merged 21 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Other Changes
================== 9.6.0 ==================
New Features
---------------------
(No changes)
* SOLR-17141: Implement 'cpuAllowed' query parameter to limit the maximum CPU usage by a running query. (Andrzej Bialecki, Gus Heck, David Smiley)

Improvements
---------------------
Expand Down
23 changes: 13 additions & 10 deletions solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.SyntaxError;
import org.apache.solr.security.PermissionNameProvider;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.SolrPluginUtils;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.ThreadStats;
import org.apache.solr.util.ThreadCpuTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -76,7 +77,7 @@ public abstract class RequestHandlerBase

private PluginInfo pluginInfo;

protected boolean publishCpuTime = Boolean.getBoolean(ThreadStats.ENABLE_CPU_TIME);
protected boolean publishCpuTime = Boolean.getBoolean(ThreadCpuTimer.ENABLE_CPU_TIME);

@SuppressForbidden(reason = "Need currentTimeMillis, used only for stats output")
public RequestHandlerBase() {
Expand Down Expand Up @@ -216,11 +217,13 @@ public abstract void handleRequestBody(SolrQueryRequest req, SolrQueryResponse r

@Override
public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {
ThreadStats cpuStats = null;
ThreadCpuTimer threadCpuTimer = null;
if (publishCpuTime) {
cpuStats = new ThreadStats();
threadCpuTimer =
SolrRequestInfo.getRequestInfo() == null
? new ThreadCpuTimer()
: SolrRequestInfo.getRequestInfo().getThreadCpuTimer();
}

HandlerMetrics metrics = getMetricsForThisRequest(req);
metrics.requests.inc();

Expand Down Expand Up @@ -250,17 +253,17 @@ public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {
long elapsed = timer.stop();
metrics.totalTime.inc(elapsed);

if (cpuStats != null) {
Optional<Long> cpuTime = cpuStats.getCpuTimeMs();
if (publishCpuTime) {
sigram marked this conversation as resolved.
Show resolved Hide resolved
Optional<Long> cpuTime = threadCpuTimer.getCpuTimeMs();
if (cpuTime.isPresent()) {
// add CPU_TIME if not already added by SearchHandler
NamedList<Object> header = rsp.getResponseHeader();
sigram marked this conversation as resolved.
Show resolved Hide resolved
if (header != null) {
if (header.get(ThreadStats.CPU_TIME) == null) {
header.add(ThreadStats.CPU_TIME, cpuTime.get());
if (header.get(ThreadCpuTimer.CPU_TIME) == null) {
header.add(ThreadCpuTimer.CPU_TIME, cpuTime.get());
}
}
rsp.addToLog(ThreadStats.LOCAL_CPU_TIME, cpuTime.get());
rsp.addToLog(ThreadCpuTimer.LOCAL_CPU_TIME, cpuTime.get());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import org.apache.solr.security.PermissionNameProvider;
import org.apache.solr.util.RTimerTree;
import org.apache.solr.util.SolrPluginUtils;
import org.apache.solr.util.ThreadStats;
import org.apache.solr.util.ThreadCpuTimer;
import org.apache.solr.util.circuitbreaker.CircuitBreaker;
import org.apache.solr.util.circuitbreaker.CircuitBreakerRegistry;
import org.apache.solr.util.plugin.PluginInfoInitialized;
Expand Down Expand Up @@ -622,8 +622,8 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
} while (nextStage != Integer.MAX_VALUE);

if (publishCpuTime) {
rsp.getResponseHeader().add(ThreadStats.CPU_TIME, totalShardCpuTime);
rsp.addToLog(ThreadStats.CPU_TIME, totalShardCpuTime);
rsp.getResponseHeader().add(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
rsp.addToLog(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
}
}

Expand Down Expand Up @@ -677,7 +677,7 @@ private long computeShardCpuTime(List<ShardResponse> responses) {
(SimpleOrderedMap<Object>)
response.getSolrResponse().getResponse().get(SolrQueryResponse.RESPONSE_HEADER_KEY);
if (header != null) {
Long shardCpuTime = (Long) header.get(ThreadStats.CPU_TIME);
Long shardCpuTime = (Long) header.get(ThreadCpuTimer.CPU_TIME);
if (shardCpuTime != null) {
totalShardCpuTime += shardCpuTime;
}
Expand Down
28 changes: 24 additions & 4 deletions solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.QueryLimits;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.util.ThreadCpuTimer;
import org.apache.solr.util.TimeZoneUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,6 +46,7 @@ public class SolrRequestInfo {
private static final ThreadLocal<Deque<SolrRequestInfo>> threadLocal =
ThreadLocal.withInitial(ArrayDeque::new);
static final Object LIMITS_KEY = new Object();
static final Object CPU_TIMER_KEY = new Object();

private int refCount = 1; // prevent closing when still used

Expand Down Expand Up @@ -78,11 +80,13 @@ public static void setRequestInfo(SolrRequestInfo info) {
assert false : "SolrRequestInfo Stack is full";
log.error("SolrRequestInfo Stack is full");
} else if (!stack.isEmpty() && info.req != null) {
// New SRI instances inherit limits from prior SRI regardless of parameters.
// This ensures limits cannot be changed or removed for a given thread once set.
// if req is null limits will be an empty instance with no limits anyway.
// New SRI instances inherit limits and thread CPU from prior SRI regardless of parameters.
// This ensures these two properties cannot be changed or removed for a given thread once set.
// if req is null then limits will be an empty instance with no limits anyway.
info.req.getContext().put(CPU_TIMER_KEY, stack.peek().getThreadCpuTimer());
info.req.getContext().put(LIMITS_KEY, stack.peek().getLimits());
}
// this creates both new QueryLimits and new ThreadCpuTime if not already set
info.initQueryLimits();
log.trace("{} {}", info, "setRequestInfo()");
assert !info.isClosed() : "SRI is already closed (odd).";
Expand Down Expand Up @@ -236,14 +240,30 @@ private void initQueryLimits() {
* empty) {@link QueryLimits} object if it has not been created, and will then return the same
* object on every subsequent invocation.
*
* @return The {@code QueryLimits} object for the current requet.
* @return The {@code QueryLimits} object for the current request.
*/
public QueryLimits getLimits() {
// make sure the ThreadCpuTime is always initialized
getThreadCpuTimer();
return req == null
? QueryLimits.NONE
: (QueryLimits) req.getContext().computeIfAbsent(LIMITS_KEY, (k) -> new QueryLimits(req));
}

/**
* Get the thread CPU time monitor for the current request. This will either trigger the creation
* of a new instance if it hasn't been yet created, or will retrieve the already existing instance
* from the "bottom" of the request stack.
*
* @return the {@link ThreadCpuTimer} object for the current request.
*/
public ThreadCpuTimer getThreadCpuTimer() {
return req == null
? new ThreadCpuTimer()
: (ThreadCpuTimer)
req.getContext().computeIfAbsent(CPU_TIMER_KEY, k -> new ThreadCpuTimer());
}

public SolrDispatchFilter.Action getAction() {
return action;
}
Expand Down
84 changes: 84 additions & 0 deletions solr/core/src/java/org/apache/solr/search/CpuAllowedLimit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search;

import com.google.common.annotations.VisibleForTesting;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.index.QueryTimeout;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.util.ThreadCpuTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Enforces a CPU-time based timeout on a given SolrQueryRequest, as specified by the {@code
* cpuAllowed} query parameter.
*/
public class CpuAllowedLimit implements QueryTimeout {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final long limitAtNs;
private final ThreadCpuTimer threadCpuTimer;

/**
* Create an object to represent a CPU time limit for the current request. NOTE: this
* implementation will attempt to obtain an existing thread CPU time monitor, created when {@link
* SolrRequestInfo#getThreadCpuTimer()} is initialized.
*
* @param req solr request with a {@code cpuAllowed} parameter
*/
public CpuAllowedLimit(SolrQueryRequest req) {
if (!ThreadCpuTimer.isSupported()) {
throw new IllegalArgumentException("Thread CPU time monitoring is not available.");
}
SolrRequestInfo solrRequestInfo = SolrRequestInfo.getRequestInfo();
threadCpuTimer =
solrRequestInfo != null ? solrRequestInfo.getThreadCpuTimer() : new ThreadCpuTimer();
long reqCpuLimit = req.getParams().getLong(CommonParams.CPU_ALLOWED, -1L);

if (reqCpuLimit <= 0L) {
throw new IllegalArgumentException(
"Check for limit with hasCpuLimit(req) before creating a CpuAllowedLimit");
}
// calculate when the time limit is reached, account for the time already spent
limitAtNs =
threadCpuTimer.getStartCpuTimeNs()
+ TimeUnit.NANOSECONDS.convert(reqCpuLimit, TimeUnit.MILLISECONDS);
}

@VisibleForTesting
CpuAllowedLimit(long limitMs) {
this.threadCpuTimer = new ThreadCpuTimer();
limitAtNs =
threadCpuTimer.getCurrentCpuTimeNs()
+ TimeUnit.NANOSECONDS.convert(limitMs, TimeUnit.MILLISECONDS);
}

/** Return true if the current request has a parameter with a valid value of the limit. */
static boolean hasCpuLimit(SolrQueryRequest req) {
return req.getParams().getLong(CommonParams.CPU_ALLOWED, -1L) > 0L;
}

/** Return true if a max limit value is set and the current usage has exceeded the limit. */
@Override
public boolean shouldExit() {
return limitAtNs - threadCpuTimer.getCurrentCpuTimeNs() < 0L;
}
}
8 changes: 6 additions & 2 deletions solr/core/src/java/org/apache/solr/search/QueryLimits.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package org.apache.solr.search;

import static org.apache.solr.search.SolrQueryTimeLimit.hasTimeLimit;
import static org.apache.solr.search.CpuAllowedLimit.hasCpuLimit;
import static org.apache.solr.search.TimeAllowedLimit.hasTimeLimit;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -44,7 +45,10 @@ private QueryLimits() {}
*/
public QueryLimits(SolrQueryRequest req) {
if (hasTimeLimit(req)) {
limits.add(new SolrQueryTimeLimit(req));
limits.add(new TimeAllowedLimit(req));
}
if (hasCpuLimit(req)) {
limits.add(new CpuAllowedLimit(req));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
* the {@code timeAllowed} query parameter. Note that timeAllowed will be ignored for
* <strong><em>local</em></strong> processing of sub-queries in cases where the parent query already
* has {@code timeAllowed} set. Essentially only one timeAllowed can be specified for any thread
* executing a query. This is to ensure that subqueies don't escape from the intended limit
* executing a query. This is to ensure that subqueries don't escape from the intended limit
*/
public class SolrQueryTimeLimit implements QueryTimeout {
public class TimeAllowedLimit implements QueryTimeout {

private final long timeoutAt;

Expand All @@ -42,23 +42,25 @@ public class SolrQueryTimeLimit implements QueryTimeout {
* should be validated with {@link #hasTimeLimit(SolrQueryRequest)} prior to constructing this
* object
*/
public SolrQueryTimeLimit(SolrQueryRequest req) {
public TimeAllowedLimit(SolrQueryRequest req) {
// reduce by time already spent
long reqTimeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L);

if (reqTimeAllowed == -1L) {
throw new IllegalArgumentException(
"Check for limit with hasTimeLimit(req) before creating a SolrQueryTimeLimit");
"Check for limit with hasTimeLimit(req) before creating a TimeAllowedLimit");
}
long timeAllowed = reqTimeAllowed - (long) req.getRequestTimer().getTime();
long nanosAllowed = TimeUnit.NANOSECONDS.convert(timeAllowed, TimeUnit.MILLISECONDS);
timeoutAt = nanoTime() + nanosAllowed;
}

/** Return true if the current request has a parameter with a valid value of the limit. */
static boolean hasTimeLimit(SolrQueryRequest req) {
return req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L) >= 0L;
}

/** Return true if a max limit value is set and the current usage has exceeded the limit. */
@Override
public boolean shouldExit() {
return timeoutAt - nanoTime() < 0L;
Expand Down
Loading
Loading