Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17211: HttpJdkSolrClient Support Async requests #2374

Merged
merged 19 commits into from
Apr 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ public void send(OutStream outStream, SolrRequest<?> req, String collection) thr

private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () -> {};

@Override
public Cancellable asyncRequest(
SolrRequest<?> solrRequest,
String collection,
Expand Down Expand Up @@ -470,7 +471,7 @@ public void onFailure(Response response, Throwable failure) {
}
}
};

asyncListener.onStart();
req = makeRequestAndSend(solrRequest, url, listener, true);
} catch (SolrServerException | IOException e) {
asyncListener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -51,6 +52,8 @@
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.util.AsyncListener;
import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
Expand Down Expand Up @@ -80,7 +83,7 @@ public class HttpJdkSolrClient extends HttpSolrClientBase {

private boolean forceHttp11;

private boolean shutdownExecutor;
private final boolean shutdownExecutor;

protected HttpJdkSolrClient(String serverBaseUrl, HttpJdkSolrClient.Builder builder) {
super(serverBaseUrl, builder);
Expand Down Expand Up @@ -133,80 +136,133 @@ protected HttpJdkSolrClient(String serverBaseUrl, HttpJdkSolrClient.Builder buil
assert ObjectReleaseTracker.track(this);
}

@Override
public Cancellable asyncRequest(
SolrRequest<?> solrRequest,
String collection,
AsyncListener<NamedList<Object>> asyncListener) {
try {
PreparedRequest pReq = prepareRequest(solrRequest, collection);
asyncListener.onStart();
CompletableFuture<NamedList<Object>> response =
httpClient
.sendAsync(pReq.reqb.build(), HttpResponse.BodyHandlers.ofInputStream())
.thenApply(
httpResponse -> {
try {
return processErrorsAndResponse(
solrRequest, pReq.parserToUse, httpResponse, pReq.url);
} catch (SolrServerException e) {
throw new RuntimeException(e);
}
})
.whenComplete(
(nl, t) -> {
if (t != null) {
asyncListener.onFailure(t);
} else {
asyncListener.onSuccess(nl);
}
});
return new HttpSolrClientCancellable(response);
} catch (Exception e) {
asyncListener.onFailure(e);
return () -> {};
}
}

@Override
public NamedList<Object> request(SolrRequest<?> solrRequest, String collection)
throws SolrServerException, IOException {
PreparedRequest pReq = prepareRequest(solrRequest, collection);
HttpResponse<InputStream> response = null;
try {
response = httpClient.send(pReq.reqb.build(), HttpResponse.BodyHandlers.ofInputStream());
return processErrorsAndResponse(solrRequest, pReq.parserToUse, response, pReq.url);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (HttpTimeoutException e) {
throw new SolrServerException(
"Timeout occurred while waiting response from server at: " + pReq.url, e);
} catch (SolrException se) {
throw se;
} catch (RuntimeException re) {
throw new SolrServerException(re);
} finally {
if (pReq.contentWritingFuture != null) {
pReq.contentWritingFuture.cancel(true);
}

// See
// https://docs.oracle.com/en/java/javase/17/docs/api/java.net.http/java/net/http/HttpResponse.BodySubscribers.html#ofInputStream()
if (!wantStream(pReq.parserToUse)) {
try {
response.body().close();
} catch (Exception e1) {
// ignore
}
}
}
}

private PreparedRequest prepareRequest(SolrRequest<?> solrRequest, String collection)
throws SolrServerException, IOException {
checkClosed();
if (ClientUtils.shouldApplyDefaultCollection(collection, solrRequest)) {
collection = defaultCollection;
}
String url = getRequestPath(solrRequest, collection);
ResponseParser parserToUse = responseParser(solrRequest);
ModifiableSolrParams queryParams = initalizeSolrParams(solrRequest, parserToUse);
HttpResponse<InputStream> resp = null;
var reqb = HttpRequest.newBuilder();
PreparedRequest pReq = null;
try {
var reqb = HttpRequest.newBuilder();
switch (solrRequest.getMethod()) {
case GET:
{
resp = doGet(url, reqb, solrRequest, queryParams);
pReq = prepareGet(url, reqb, solrRequest, queryParams);
break;
}
case POST:
case PUT:
{
resp = doPutOrPost(url, solrRequest.getMethod(), reqb, solrRequest, queryParams);
pReq = preparePutOrPost(url, solrRequest.getMethod(), reqb, solrRequest, queryParams);
break;
}
default:
{
throw new IllegalStateException("Unsupported method: " + solrRequest.getMethod());
}
}
return processErrorsAndResponse(solrRequest, parserToUse, resp, url);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (HttpTimeoutException e) {
throw new SolrServerException(
"Timeout occurred while waiting response from server at: " + url, e);
} catch (SolrException se) {
throw se;
} catch (URISyntaxException | RuntimeException re) {
throw new SolrServerException(re);
} finally {
// See
// https://docs.oracle.com/en/java/javase/17/docs/api/java.net.http/java/net/http/HttpResponse.BodySubscribers.html#ofInputStream()
if (!wantStream(parserToUse)) {
try {
resp.body().close();
} catch (Exception e1) {
// ignore
}
}
}
pReq.parserToUse = parserToUse;
pReq.url = url;
return pReq;
}

private HttpResponse<InputStream> doGet(
private PreparedRequest prepareGet(
String url,
HttpRequest.Builder reqb,
SolrRequest<?> solrRequest,
ModifiableSolrParams queryParams)
throws IOException, InterruptedException, URISyntaxException {
throws IOException, URISyntaxException {
validateGetRequest(solrRequest);
reqb.GET();
decorateRequest(reqb, solrRequest);
reqb.uri(new URI(url + "?" + queryParams));
return httpClient.send(reqb.build(), HttpResponse.BodyHandlers.ofInputStream());
return new PreparedRequest(reqb, null);
}

private HttpResponse<InputStream> doPutOrPost(
private PreparedRequest preparePutOrPost(
String url,
SolrRequest.METHOD method,
HttpRequest.Builder reqb,
SolrRequest<?> solrRequest,
ModifiableSolrParams queryParams)
throws IOException, InterruptedException, URISyntaxException {
throws IOException, URISyntaxException {

final RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(solrRequest);

Expand Down Expand Up @@ -274,15 +330,21 @@ private HttpResponse<InputStream> doPutOrPost(
URI uriWithQueryParams = new URI(url + "?" + queryParams);
reqb.uri(uriWithQueryParams);

HttpResponse<InputStream> response;
try {
response = httpClient.send(reqb.build(), HttpResponse.BodyHandlers.ofInputStream());
} finally {
if (contentWritingFuture != null) {
contentWritingFuture.cancel(true);
}
return new PreparedRequest(reqb, contentWritingFuture);
}

private static class PreparedRequest {
Future<?> contentWritingFuture;
HttpRequest.Builder reqb;

ResponseParser parserToUse;

String url;

PreparedRequest(HttpRequest.Builder reqb, Future<?> contentWritingFuture) {
this.reqb = reqb;
this.contentWritingFuture = contentWritingFuture;
}
return response;
}

/**
Expand Down Expand Up @@ -469,6 +531,23 @@ protected String allProcessorSupportedContentTypesCommaDelimited(
.collect(Collectors.joining(", "));
}

protected static class HttpSolrClientCancellable implements Cancellable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing this class, to me, really underscores a mismatch between Solr's needless Async API helper classes (Cancellable is one), and using a CompletableFuture directly. See the parent JIRA issue where I direct you to a previous stalled effort that could be resumed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I fully agree we should deprecate and replace the existing method, but we will need to keep the old API around for 9.x. I am interested in taking a closer look at SOLR-14763 and I appreciate your pointing me to that. But in any case I would like to do that separately from this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Note the old API is on exactly one class a user might use; it's being expanded here. But at least it's only other Http subclasses; isn't spreading further. If you can mark these methods Deprecated here (to be replaced by something coming (soon?)), people will be forewarned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not want to deprecate anything unless there is a replacement. But I hear and share your concern, and would like to make the API change, just not as part of this PR.

private final CompletableFuture<NamedList<Object>> response;

protected HttpSolrClientCancellable(CompletableFuture<NamedList<Object>> response) {
this.response = response;
}

@Override
public void cancel() {
response.cancel(true);
}

protected CompletableFuture<NamedList<Object>> getResponse() {
return response;
}
}

public static class Builder
extends HttpSolrClientBuilderBase<HttpJdkSolrClient.Builder, HttpJdkSolrClient> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.util.AsyncListener;
import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
Expand Down Expand Up @@ -368,6 +370,20 @@ protected void setParser(ResponseParser parser) {

protected abstract void updateDefaultMimeTypeForParser();

/**
* Execute an asynchronous request to a Solr collection
*
* @param solrRequest the request to perform
* @param collection if null the default collection is used
* @param asyncListener callers should provide an implementation to handle events: start, success,
* exception
* @return Cancellable allowing the caller to attempt cancellation
*/
public abstract Cancellable asyncRequest(
SolrRequest<?> solrRequest,
String collection,
AsyncListener<NamedList<Object>> asyncListener);

public boolean isV2ApiRequest(final SolrRequest<?> request) {
return request instanceof V2Request || request.getPath().contains("/____v2");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@

package org.apache.solr.client.solrj.util;

/** Listener for async requests */
/**
* Listener for async requests
*
* @param <T> The result type returned by the {@code onSuccess} method
*/
public interface AsyncListener<T> {
/** Callback method invoked before processing the request */
default void onStart() {}

/** Callback method invoked when the request completes successfully */
void onSuccess(T t);

/** Callback method invoked when the request completes in failure */
void onFailure(Throwable throwable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

package org.apache.solr.client.solrj.util;

/**
* The return type for solrJ asynchronous requests, providing a mechanism whereby callers may
* request cancellation.
*/
public interface Cancellable {

/**
* Request to cancel the asynchronous request. This may be a no-op in some situations, for
* instance, if the request failed or otherwise is complete.
*/
void cancel();
}
Loading
Loading