-
Notifications
You must be signed in to change notification settings - Fork 686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SOLR-17158 Terminate distributed processing quickly when query limit is reached - Initial impl #2379
SOLR-17158 Terminate distributed processing quickly when query limit is reached - Initial impl #2379
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,9 @@ | |
*/ | ||
package org.apache.solr.handler.component; | ||
|
||
import static org.apache.solr.common.params.CommonParams.ALLOW_PARTIAL_RESULTS; | ||
import static org.apache.solr.request.SolrQueryRequest.shouldDiscardPartials; | ||
|
||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -43,13 +46,25 @@ | |
import org.apache.solr.common.params.ShardParams; | ||
import org.apache.solr.common.params.SolrParams; | ||
import org.apache.solr.common.util.NamedList; | ||
import org.apache.solr.common.util.StrUtils; | ||
import org.apache.solr.core.CoreDescriptor; | ||
import org.apache.solr.request.SolrQueryRequest; | ||
import org.apache.solr.request.SolrRequestInfo; | ||
import org.apache.solr.security.AllowListUrlChecker; | ||
|
||
@NotThreadSafe | ||
public class HttpShardHandler extends ShardHandler { | ||
/** | ||
* Throw an error from search requests when the {@value ShardParams#SHARDS_TOLERANT} param has | ||
* this value and ZooKeeper is not connected. | ||
* | ||
* @see #getShardsTolerantAsBool(SolrQueryRequest) | ||
*/ | ||
public static final String REQUIRE_ZK_CONNECTED = "requireZkConnected"; | ||
|
||
/** */ | ||
private final Object RESPONSE_CANCELABLE_LOCK = new Object(); | ||
|
||
/** | ||
* If the request context map has an entry with this key and Boolean.TRUE as value, {@link | ||
* #prepDistributed(ResponseBuilder)} will only include {@link | ||
|
@@ -63,7 +78,14 @@ public class HttpShardHandler extends ShardHandler { | |
private HttpShardHandlerFactory httpShardHandlerFactory; | ||
private Map<ShardResponse, Cancellable> responseCancellableMap; | ||
private BlockingQueue<ShardResponse> responses; | ||
|
||
/** | ||
* The number of pending requests. This must be incremented before a {@link ShardResponse} is | ||
* added to {@link #responses}, and decremented after a ShardResponse is removed from {@code | ||
* responses}. We cannot rely on responses.size() bec | ||
*/ | ||
private AtomicInteger pending; | ||
|
||
private Map<String, List<String>> shardToURLs; | ||
private LBHttp2SolrClient lbClient; | ||
|
||
|
@@ -81,6 +103,37 @@ public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) { | |
shardToURLs = new HashMap<>(); | ||
} | ||
|
||
/** | ||
* Parse the {@value ShardParams#SHARDS_TOLERANT} param from <code>params</code> as a boolean; | ||
* accepts {@value #REQUIRE_ZK_CONNECTED} as a valid value indicating <code>false</code>. | ||
* | ||
* <p>By default, returns <code>false</code> when {@value ShardParams#SHARDS_TOLERANT} is not set | ||
* in <code> | ||
* params</code>. | ||
*/ | ||
public static boolean getShardsTolerantAsBool(SolrQueryRequest req) { | ||
String shardsTolerantValue = req.getParams().get(ShardParams.SHARDS_TOLERANT); | ||
if (null == shardsTolerantValue || shardsTolerantValue.equals(REQUIRE_ZK_CONNECTED)) { | ||
return false; | ||
} else { | ||
boolean tolerant = StrUtils.parseBool(shardsTolerantValue); | ||
if (tolerant && shouldDiscardPartials(req.getParams())) { | ||
throw new SolrException( | ||
SolrException.ErrorCode.BAD_REQUEST, | ||
"Use of " | ||
+ ShardParams.SHARDS_TOLERANT | ||
+ " requires that " | ||
+ ALLOW_PARTIAL_RESULTS | ||
+ " is true. If " | ||
+ ALLOW_PARTIAL_RESULTS | ||
+ " is defaulted to false explicitly passing " | ||
+ ALLOW_PARTIAL_RESULTS | ||
+ "=true in the request will allow shards.tolerant to work"); | ||
} | ||
return tolerant; // throw an exception if non-boolean | ||
} | ||
} | ||
|
||
private static class SimpleSolrResponse extends SolrResponse { | ||
|
||
volatile long elapsedTime; | ||
|
@@ -140,58 +193,64 @@ public void submit( | |
srsp.setShard(shard); | ||
SimpleSolrResponse ssr = new SimpleSolrResponse(); | ||
srsp.setSolrResponse(ssr); | ||
synchronized (RESPONSE_CANCELABLE_LOCK) { | ||
pending.incrementAndGet(); | ||
// if there are no shards available for a slice, urls.size()==0 | ||
if (urls.isEmpty()) { | ||
// TODO: what's the right error code here? We should use the same thing when | ||
// all of the servers for a shard are down. | ||
SolrException exception = | ||
new SolrException( | ||
SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard); | ||
srsp.setException(exception); | ||
srsp.setResponseCode(exception.code()); | ||
responses.add(srsp); | ||
return; | ||
} | ||
|
||
pending.incrementAndGet(); | ||
// if there are no shards available for a slice, urls.size()==0 | ||
if (urls.isEmpty()) { | ||
// TODO: what's the right error code here? We should use the same thing when | ||
// all of the servers for a shard are down. | ||
SolrException exception = | ||
new SolrException( | ||
SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard); | ||
srsp.setException(exception); | ||
srsp.setResponseCode(exception.code()); | ||
responses.add(srsp); | ||
return; | ||
} | ||
// all variables that set inside this listener must be at least volatile | ||
responseCancellableMap.put( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe if we used a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't feel comfortable with that solution because this map is updated while looping on an Atomic integer |
||
srsp, | ||
this.lbClient.asyncReq( | ||
lbReq, | ||
new AsyncListener<>() { | ||
volatile long startTime = System.nanoTime(); | ||
|
||
@Override | ||
public void onStart() { | ||
// Reminder: this is called in the parent thread, not in the request execution | ||
// thread. | ||
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo(); | ||
if (requestInfo != null) | ||
req.setUserPrincipal(requestInfo.getReq().getUserPrincipal()); | ||
} | ||
|
||
@Override | ||
public void onSuccess(LBSolrClient.Rsp rsp) { | ||
ssr.nl = rsp.getResponse(); | ||
srsp.setShardAddress(rsp.getServer()); | ||
ssr.elapsedTime = | ||
TimeUnit.MILLISECONDS.convert( | ||
System.nanoTime() - startTime, TimeUnit.NANOSECONDS); | ||
responses.add(srsp); | ||
} | ||
|
||
// all variables that set inside this listener must be at least volatile | ||
responseCancellableMap.put( | ||
srsp, | ||
this.lbClient.asyncReq( | ||
lbReq, | ||
new AsyncListener<>() { | ||
volatile long startTime = System.nanoTime(); | ||
|
||
@Override | ||
public void onStart() { | ||
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo(); | ||
if (requestInfo != null) | ||
req.setUserPrincipal(requestInfo.getReq().getUserPrincipal()); | ||
} | ||
|
||
@Override | ||
public void onSuccess(LBSolrClient.Rsp rsp) { | ||
ssr.nl = rsp.getResponse(); | ||
srsp.setShardAddress(rsp.getServer()); | ||
ssr.elapsedTime = | ||
TimeUnit.MILLISECONDS.convert( | ||
System.nanoTime() - startTime, TimeUnit.NANOSECONDS); | ||
responses.add(srsp); | ||
} | ||
|
||
@Override | ||
public void onFailure(Throwable throwable) { | ||
ssr.elapsedTime = | ||
TimeUnit.MILLISECONDS.convert( | ||
System.nanoTime() - startTime, TimeUnit.NANOSECONDS); | ||
srsp.setException(throwable); | ||
if (throwable instanceof SolrException) { | ||
srsp.setResponseCode(((SolrException) throwable).code()); | ||
@Override | ||
public void onFailure(Throwable throwable) { | ||
ssr.elapsedTime = | ||
TimeUnit.MILLISECONDS.convert( | ||
System.nanoTime() - startTime, TimeUnit.NANOSECONDS); | ||
srsp.setException(throwable); | ||
if (throwable instanceof SolrException) { | ||
srsp.setResponseCode(((SolrException) throwable).code()); | ||
} | ||
responses.add(srsp); | ||
if (shouldDiscardPartials(params)) { | ||
cancelAll(); | ||
} | ||
} | ||
responses.add(srsp); | ||
} | ||
})); | ||
})); | ||
} | ||
} | ||
|
||
/** Subclasses could modify the request based on the shard */ | ||
|
@@ -202,7 +261,7 @@ protected QueryRequest makeQueryRequest( | |
} | ||
|
||
/** Subclasses could modify the Response based on the shard */ | ||
protected ShardResponse transfomResponse( | ||
protected ShardResponse transformResponse( | ||
final ShardRequest sreq, ShardResponse rsp, String shard) { | ||
return rsp; | ||
} | ||
|
@@ -227,18 +286,28 @@ public ShardResponse takeCompletedOrError() { | |
|
||
private ShardResponse take(boolean bailOnError) { | ||
try { | ||
// although nothing in this class guarantees that pending has been incremented to the total | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @CaoManhDat if you can double check my reading of this code and let me know if this comment is inaccurate in any way I'd appreciate it. (I see your name in the git blame a bunch here) |
||
// number of expected requests, actual usage in SearchHandler results in this method never | ||
// being called until all requests have been added in a prior loop over | ||
// ShardRequest.actualShards in the same thread that invokes take() (I haven't checked but | ||
// hopefully other handlers do the same) The net effect is we shouldn't arrive here with | ||
// pending < ShardRequest.actualShards.size() | ||
while (pending.get() > 0) { | ||
ShardResponse rsp = responses.take(); | ||
responseCancellableMap.remove(rsp); | ||
|
||
pending.decrementAndGet(); | ||
ShardResponse rsp; | ||
synchronized (RESPONSE_CANCELABLE_LOCK) { | ||
rsp = responses.take(); | ||
responseCancellableMap.remove(rsp); | ||
pending.decrementAndGet(); | ||
} | ||
if (bailOnError && rsp.getException() != null) | ||
return rsp; // if exception, return immediately | ||
// add response to the response list... we do this after the take() and | ||
// not after the completion of "call" so we know when the last response | ||
// for a request was received. Otherwise we might return the same | ||
// request more than once. | ||
rsp.getShardRequest().responses.add(rsp); | ||
|
||
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) { | ||
return rsp; | ||
} | ||
|
@@ -251,11 +320,13 @@ private ShardResponse take(boolean bailOnError) { | |
|
||
@Override | ||
public void cancelAll() { | ||
for (Cancellable cancellable : responseCancellableMap.values()) { | ||
cancellable.cancel(); | ||
pending.decrementAndGet(); | ||
synchronized (RESPONSE_CANCELABLE_LOCK) { | ||
for (Cancellable cancellable : responseCancellableMap.values()) { | ||
cancellable.cancel(); | ||
pending.decrementAndGet(); | ||
} | ||
responseCancellableMap.clear(); | ||
} | ||
responseCancellableMap.clear(); | ||
} | ||
|
||
@Override | ||
|
@@ -310,7 +381,7 @@ public void prepDistributed(ResponseBuilder rb) { | |
// be an optimization? | ||
} | ||
|
||
if (!ShardParams.getShardsTolerantAsBool(params)) { | ||
if (!getShardsTolerantAsBool(req)) { | ||
for (int i = 0; i < rb.slices.length; i++) { | ||
if (replicaSource.getReplicasBySlice(i).isEmpty()) { | ||
final ReplicaSource allActiveReplicaSource = | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer you not bother with all the machinery in solr.xml/NodeConfig and instead have a simple EnvUtils (env or sys prop) to enable. It's just a boolean; doesn't have any interesting config to it. Yes there are other booleans here but I think we Solr maintainers should consider how NodeConfig might systematically / dynamically understand primitive values (boolean, integer, ...) and apply to EnvUtils automatically without having to touch the NodeConfig class (which is kind of a pain; that builder!). For example imagine "solr.search.partialResults" being settable as a system property, or also settable in solr.xml automatically via "searchPartialResults".
CC @janhoy
Another take on this is that, shouldn't the user simply go to their solrconfig.xml and set a default partialResults like we all do for miscellaneous things for search?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, IMHO having a per-collection setting instead of a global sysprop is more flexible (and you can still change it globally using property substitution).