Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAMES-3929 Using http5 client instead of opensearch rest client #1648

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions backends-common/opensearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
</dependency>
<!-- Needed for opensearch-java dependency -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
Expand All @@ -83,12 +89,7 @@
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-java</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-client</artifactId>
<version>2.14.0</version>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.security.cert.CertificateException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
Expand All @@ -35,21 +36,24 @@

import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.CredentialsStore;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.client5.http.ssl.DefaultHostnameVerifier;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.hc.core5.ssl.TrustStrategy;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.opensearch.client.RestClient;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,7 +64,7 @@
public class ClientProvider implements Provider<ReactorOpenSearchClient> {

private static class HttpAsyncClientConfigurer {

private static final AuthScope ANY = new AuthScope(null, null, -1, null, null);
private static final TrustStrategy TRUST_ALL = (x509Certificates, authType) -> true;
private static final HostnameVerifier ACCEPT_ANY_HOSTNAME = (hostname, sslSession) -> true;

Expand All @@ -75,9 +79,6 @@ private HttpAsyncClientBuilder configure(HttpAsyncClientBuilder builder) {
configureHostScheme(builder);
configureTimeout(builder);

configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal);
configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute);

builder.setThreadFactory(NamedThreadFactory.withName("OpenSearch-driver"));

return builder;
Expand All @@ -99,19 +100,35 @@ private void configureHostScheme(HttpAsyncClientBuilder builder) {
}

private void configureSSLOptions(HttpAsyncClientBuilder builder) {
try {
builder
.setSSLContext(sslContext())
.setSSLHostnameVerifier(hostnameVerifier());
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) {
throw new RuntimeException("Cannot set SSL options to the builder", e);
}
builder.setConnectionManager(connectionManager());
}

private void configureTimeout(HttpAsyncClientBuilder builder) {
builder.setDefaultRequestConfig(requestConfig());
}

private PoolingAsyncClientConnectionManager connectionManager() {
PoolingAsyncClientConnectionManagerBuilder builder = PoolingAsyncClientConnectionManagerBuilder
.create()
.setTlsStrategy(tlsStrategy());

configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal);
configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute);

return builder.build();
}

private TlsStrategy tlsStrategy() {
try {
return ClientTlsStrategyBuilder.create()
.setSslContext(sslContext())
.setHostnameVerifier(hostnameVerifier())
.build();
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) {
throw new RuntimeException("Cannot set SSL options to the builder", e);
}
}

private SSLContext sslContext() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException,
CertificateException, IOException {

Expand Down Expand Up @@ -152,9 +169,9 @@ private HostnameVerifier hostnameVerifier() {

private RequestConfig requestConfig() {
return RequestConfig.custom()
.setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
.setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
.setSocketTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
.setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS)
.setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS)
.setResponseTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS)
.build();
}

Expand All @@ -172,9 +189,9 @@ private SSLContextBuilder applyTrustStore(SSLContextBuilder sslContextBuilder) t
private void configureAuthentication(HttpAsyncClientBuilder builder) {
configuration.getCredential()
.ifPresent(credential -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(credential.getUsername(), String.valueOf(credential.getPassword())));
CredentialsStore credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(ANY,
new UsernamePasswordCredentials(credential.getUsername(), credential.getPassword()));
builder.setDefaultCredentialsProvider(credentialsProvider);
});
}
Expand All @@ -183,7 +200,7 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class);

private final OpenSearchConfiguration configuration;
private final RestClient lowLevelRestClient;
private final OpenSearchTransport openSearchTransport;
private final OpenSearchAsyncClient openSearchClient;
private final HttpAsyncClientConfigurer httpAsyncClientConfigurer;
private final ReactorOpenSearchClient client;
Expand All @@ -192,13 +209,13 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) {
public ClientProvider(OpenSearchConfiguration configuration) {
this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration);
this.configuration = configuration;
this.lowLevelRestClient = buildRestClient();
this.openSearchTransport = buildTransport();
this.openSearchClient = connect();
this.client = new ReactorOpenSearchClient(this.openSearchClient, lowLevelRestClient);
this.client = new ReactorOpenSearchClient(this.openSearchClient);
}

private RestClient buildRestClient() {
return RestClient.builder(hostsToHttpHosts())
private OpenSearchTransport buildTransport() {
return ApacheHttpClient5TransportBuilder.builder(hostsToHttpHosts())
.setHttpClientConfigCallback(httpAsyncClientConfigurer::configure)
.build();
}
Expand All @@ -218,14 +235,12 @@ private OpenSearchAsyncClient connect() {
private OpenSearchAsyncClient connectToCluster() {
LOGGER.info("Trying to connect to OpenSearch service at {}", LocalDateTime.now());

RestClientTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper());

return new OpenSearchAsyncClient(transport);
return new OpenSearchAsyncClient(openSearchTransport);
}

private HttpHost[] hostsToHttpHosts() {
return configuration.getHosts().stream()
.map(host -> new HttpHost(host.getHostName(), host.getPort(), configuration.getHostScheme().name()))
.map(host -> new HttpHost(configuration.getHostScheme().name(), host.getHostName(), host.getPort()))
.toArray(HttpHost[]::new);
}

Expand All @@ -236,6 +251,6 @@ public ReactorOpenSearchClient get() {

@PreDestroy
public void close() throws IOException {
lowLevelRestClient.close();
openSearchTransport.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import org.opensearch.client.RestClient;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.opensearch.cluster.HealthRequest;
import org.opensearch.client.opensearch.cluster.HealthResponse;
Expand Down Expand Up @@ -58,11 +57,9 @@

public class ReactorOpenSearchClient implements AutoCloseable {
private final OpenSearchAsyncClient client;
private final RestClient lowLevelRestClient;

public ReactorOpenSearchClient(OpenSearchAsyncClient client, RestClient lowLevelRestClient) {
public ReactorOpenSearchClient(OpenSearchAsyncClient client) {
this.client = client;
this.lowLevelRestClient = lowLevelRestClient;
}

public Mono<BulkResponse> bulk(BulkRequest bulkRequest) throws IOException {
Expand All @@ -81,10 +78,6 @@ public Mono<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest deleteRequ
return toReactor(client.deleteByQuery(deleteRequest));
}

public RestClient getLowLevelClient() {
return lowLevelRestClient;
}

public <T> Mono<IndexResponse> index(IndexRequest<T> indexRequest) throws IOException {
return toReactor(client.index(indexRequest));
}
Expand Down Expand Up @@ -127,7 +120,7 @@ public Mono<GetResponse<ObjectNode>> get(GetRequest getRequest) throws IOExcepti

@Override
public void close() throws IOException {
lowLevelRestClient.close();
client._transport().close();
}

private static <T> Mono<T> toReactor(CompletableFuture<T> async) {
Expand Down