Skip to content

Commit

Permalink
SOLR-17141 branch_9x: Create CpuQueryLimit implementation (#2287)
Browse files Browse the repository at this point in the history
* SOLR-17141: Create CpuQueryLimit implementation (#2244)
* Refactor to fix ThreadStats / ThreadCpuTimer nesting and use it in CpuQueryTimeLimit.
* Rename classes to better reflect the type of limit.
  • Loading branch information
sigram authored Feb 20, 2024
1 parent c42eba8 commit a667bb4
Show file tree
Hide file tree
Showing 10 changed files with 679 additions and 12 deletions.
2 changes: 1 addition & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ https://github.com/apache/solr/blob/main/solr/solr-ref-guide/modules/upgrade-not
================== 9.6.0 ==================
New Features
---------------------
(No changes)
* SOLR-17141: Implement 'cpuAllowed' query parameter to limit the maximum CPU usage by a running query. (Andrzej Bialecki, Gus Heck, David Smiley)

Improvements
---------------------
Expand Down
28 changes: 24 additions & 4 deletions solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.QueryLimits;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.util.ThreadCpuTimer;
import org.apache.solr.util.TimeZoneUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,6 +46,7 @@ public class SolrRequestInfo {
private static final ThreadLocal<Deque<SolrRequestInfo>> threadLocal =
ThreadLocal.withInitial(ArrayDeque::new);
static final Object LIMITS_KEY = new Object();
static final Object CPU_TIMER_KEY = new Object();

private int refCount = 1; // prevent closing when still used

Expand Down Expand Up @@ -78,11 +80,13 @@ public static void setRequestInfo(SolrRequestInfo info) {
assert false : "SolrRequestInfo Stack is full";
log.error("SolrRequestInfo Stack is full");
} else if (!stack.isEmpty() && info.req != null) {
// New SRI instances inherit limits from prior SRI regardless of parameters.
// This ensures limits cannot be changed or removed for a given thread once set.
// if req is null limits will be an empty instance with no limits anyway.
// New SRI instances inherit limits and thread CPU from prior SRI regardless of parameters.
// This ensures these two properties cannot be changed or removed for a given thread once set.
// if req is null then limits will be an empty instance with no limits anyway.
info.req.getContext().put(CPU_TIMER_KEY, stack.peek().getThreadCpuTimer());
info.req.getContext().put(LIMITS_KEY, stack.peek().getLimits());
}
// this creates both new QueryLimits and new ThreadCpuTime if not already set
info.initQueryLimits();
log.trace("{} {}", info, "setRequestInfo()");
assert !info.isClosed() : "SRI is already closed (odd).";
Expand Down Expand Up @@ -236,14 +240,30 @@ private void initQueryLimits() {
* empty) {@link QueryLimits} object if it has not been created, and will then return the same
* object on every subsequent invocation.
*
* @return The {@code QueryLimits} object for the current requet.
* @return The {@code QueryLimits} object for the current request.
*/
public QueryLimits getLimits() {
// make sure the ThreadCpuTime is always initialized
getThreadCpuTimer();
return req == null
? QueryLimits.NONE
: (QueryLimits) req.getContext().computeIfAbsent(LIMITS_KEY, (k) -> new QueryLimits(req));
}

/**
* Get the thread CPU time monitor for the current request. This will either trigger the creation
* of a new instance if it hasn't been yet created, or will retrieve the already existing instance
* from the "bottom" of the request stack.
*
* @return the {@link ThreadCpuTimer} object for the current request.
*/
public ThreadCpuTimer getThreadCpuTimer() {
return req == null
? new ThreadCpuTimer()
: (ThreadCpuTimer)
req.getContext().computeIfAbsent(CPU_TIMER_KEY, k -> new ThreadCpuTimer());
}

public SolrDispatchFilter.Action getAction() {
return action;
}
Expand Down
84 changes: 84 additions & 0 deletions solr/core/src/java/org/apache/solr/search/CpuAllowedLimit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search;

import com.google.common.annotations.VisibleForTesting;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.index.QueryTimeout;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.util.ThreadCpuTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Enforces a CPU-time based timeout on a given SolrQueryRequest, as specified by the {@code
* cpuAllowed} query parameter.
*/
public class CpuAllowedLimit implements QueryTimeout {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final long limitAtNs;
private final ThreadCpuTimer threadCpuTimer;

/**
* Create an object to represent a CPU time limit for the current request. NOTE: this
* implementation will attempt to obtain an existing thread CPU time monitor, created when {@link
* SolrRequestInfo#getThreadCpuTimer()} is initialized.
*
* @param req solr request with a {@code cpuAllowed} parameter
*/
public CpuAllowedLimit(SolrQueryRequest req) {
if (!ThreadCpuTimer.isSupported()) {
throw new IllegalArgumentException("Thread CPU time monitoring is not available.");
}
SolrRequestInfo solrRequestInfo = SolrRequestInfo.getRequestInfo();
threadCpuTimer =
solrRequestInfo != null ? solrRequestInfo.getThreadCpuTimer() : new ThreadCpuTimer();
long reqCpuLimit = req.getParams().getLong(CommonParams.CPU_ALLOWED, -1L);

if (reqCpuLimit <= 0L) {
throw new IllegalArgumentException(
"Check for limit with hasCpuLimit(req) before creating a CpuAllowedLimit");
}
// calculate when the time limit is reached, account for the time already spent
limitAtNs =
threadCpuTimer.getStartCpuTimeNs()
+ TimeUnit.NANOSECONDS.convert(reqCpuLimit, TimeUnit.MILLISECONDS);
}

@VisibleForTesting
CpuAllowedLimit(long limitMs) {
this.threadCpuTimer = new ThreadCpuTimer();
limitAtNs =
threadCpuTimer.getCurrentCpuTimeNs()
+ TimeUnit.NANOSECONDS.convert(limitMs, TimeUnit.MILLISECONDS);
}

/** Return true if the current request has a parameter with a valid value of the limit. */
static boolean hasCpuLimit(SolrQueryRequest req) {
return req.getParams().getLong(CommonParams.CPU_ALLOWED, -1L) > 0L;
}

/** Return true if a max limit value is set and the current usage has exceeded the limit. */
@Override
public boolean shouldExit() {
return limitAtNs - threadCpuTimer.getCurrentCpuTimeNs() < 0L;
}
}
8 changes: 6 additions & 2 deletions solr/core/src/java/org/apache/solr/search/QueryLimits.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package org.apache.solr.search;

import static org.apache.solr.search.SolrQueryTimeLimit.hasTimeLimit;
import static org.apache.solr.search.CpuAllowedLimit.hasCpuLimit;
import static org.apache.solr.search.TimeAllowedLimit.hasTimeLimit;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -44,7 +45,10 @@ private QueryLimits() {}
*/
public QueryLimits(SolrQueryRequest req) {
if (hasTimeLimit(req)) {
limits.add(new SolrQueryTimeLimit(req));
limits.add(new TimeAllowedLimit(req));
}
if (hasCpuLimit(req)) {
limits.add(new CpuAllowedLimit(req));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
* the {@code timeAllowed} query parameter. Note that timeAllowed will be ignored for
* <strong><em>local</em></strong> processing of sub-queries in cases where the parent query already
* has {@code timeAllowed} set. Essentially only one timeAllowed can be specified for any thread
* executing a query. This is to ensure that subqueies don't escape from the intended limit
* executing a query. This is to ensure that subqueries don't escape from the intended limit
*/
public class SolrQueryTimeLimit implements QueryTimeout {
public class TimeAllowedLimit implements QueryTimeout {

private final long timeoutAt;

Expand All @@ -42,23 +42,25 @@ public class SolrQueryTimeLimit implements QueryTimeout {
* should be validated with {@link #hasTimeLimit(SolrQueryRequest)} prior to constructing this
* object
*/
public SolrQueryTimeLimit(SolrQueryRequest req) {
public TimeAllowedLimit(SolrQueryRequest req) {
// reduce by time already spent
long reqTimeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L);

if (reqTimeAllowed == -1L) {
throw new IllegalArgumentException(
"Check for limit with hasTimeLimit(req) before creating a SolrQueryTimeLimit");
"Check for limit with hasTimeLimit(req) before creating a TimeAllowedLimit");
}
long timeAllowed = reqTimeAllowed - (long) req.getRequestTimer().getTime();
long nanosAllowed = TimeUnit.NANOSECONDS.convert(timeAllowed, TimeUnit.MILLISECONDS);
timeoutAt = nanoTime() + nanosAllowed;
}

/** Return true if the current request has a parameter with a valid value of the limit. */
static boolean hasTimeLimit(SolrQueryRequest req) {
return req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L) >= 0L;
}

/** Return true if a max limit value is set and the current usage has exceeded the limit. */
@Override
public boolean shouldExit() {
return timeoutAt - nanoTime() < 0L;
Expand Down
116 changes: 116 additions & 0 deletions solr/core/src/java/org/apache/solr/util/ThreadCpuTimer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.util;

import java.lang.invoke.MethodHandles;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Allows tracking information about the current thread using the JVM's built-in management bean
* {@link java.lang.management.ThreadMXBean}.
*
* <p>Calling code should create an instance of this class when starting the operation, and then can
* get the {@link #getCpuTimeMs()} at any time thereafter.
*/
public class ThreadCpuTimer {
private static final long UNSUPPORTED = -1;
public static final String CPU_TIME = "cpuTime";
public static final String LOCAL_CPU_TIME = "localCpuTime";
public static final String ENABLE_CPU_TIME = "solr.log.cputime";

private static ThreadMXBean THREAD_MX_BEAN;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

static {
try {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
if (!threadBean.isThreadCpuTimeEnabled()) {
threadBean.setThreadCpuTimeEnabled(true);
}
THREAD_MX_BEAN = threadBean;
} catch (UnsupportedOperationException | SecurityException e) {
THREAD_MX_BEAN = null;
log.info("Thread CPU time monitoring is not available.");
}
}

private final long startCpuTimeNanos;

/**
* Create an instance to track the current thread's usage of CPU. The usage information can later
* be retrieved by any thread by calling {@link #getCpuTimeMs()}.
*/
public ThreadCpuTimer() {
if (THREAD_MX_BEAN != null) {
this.startCpuTimeNanos = THREAD_MX_BEAN.getCurrentThreadCpuTime();
} else {
this.startCpuTimeNanos = UNSUPPORTED;
}
}

public static boolean isSupported() {
return THREAD_MX_BEAN != null;
}

/**
* Return the initial value of CPU time for this thread when this instance was first created.
* NOTE: absolute value returned by this method has no meaning by itself, it should only be used
* when comparing elapsed time between this value and {@link #getCurrentCpuTimeNs()}.
*
* @return current value, or {@link #UNSUPPORTED} if not supported.
*/
public long getStartCpuTimeNs() {
return startCpuTimeNanos;
}

/**
* Return current value of CPU time for this thread.
*
* @return current value, or {@link #UNSUPPORTED} if not supported.
*/
public long getCurrentCpuTimeNs() {
if (THREAD_MX_BEAN != null) {
return this.startCpuTimeNanos != UNSUPPORTED
? THREAD_MX_BEAN.getCurrentThreadCpuTime() - this.startCpuTimeNanos
: UNSUPPORTED;
} else {
return UNSUPPORTED;
}
}

/**
* Get the CPU usage information for the thread that created this {@link ThreadCpuTimer}. The
* information will track the thread's cpu since the creation of this {@link ThreadCpuTimer}
* instance, if the VM's cpu tracking is disabled, returned value will be {@link #UNSUPPORTED}.
*/
public Optional<Long> getCpuTimeMs() {
long cpuTimeNs = getCurrentCpuTimeNs();
return cpuTimeNs != UNSUPPORTED
? Optional.of(TimeUnit.MILLISECONDS.convert(cpuTimeNs, TimeUnit.NANOSECONDS))
: Optional.of(UNSUPPORTED);
}

@Override
public String toString() {
return getCpuTimeMs().toString();
}
}
Loading

0 comments on commit a667bb4

Please sign in to comment.