diff --git a/backends-common/opensearch/pom.xml b/backends-common/opensearch/pom.xml
index 0b08d59bb48..d9d80349dd0 100644
--- a/backends-common/opensearch/pom.xml
+++ b/backends-common/opensearch/pom.xml
@@ -71,6 +71,12 @@
org.apache.commons
commons-configuration2
+
+
+ org.apache.httpcomponents.client5
+ httpclient5
+ 5.3.1
+
org.apache.logging.log4j
log4j-to-slf4j
@@ -83,12 +89,7 @@
org.opensearch.client
opensearch-java
- 2.10.4
-
-
- org.opensearch.client
- opensearch-rest-client
- 2.14.0
+ 2.11.1
org.slf4j
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
index bc87d19f36d..6f75b14ad7e 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
@@ -25,6 +25,7 @@
import java.security.cert.CertificateException;
import java.time.Duration;
import java.time.LocalDateTime;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
@@ -35,21 +36,24 @@
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.time.DurationFormatUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
-import org.apache.http.ssl.SSLContextBuilder;
-import org.apache.http.ssl.TrustStrategy;
+import org.apache.hc.client5.http.auth.AuthScope;
+import org.apache.hc.client5.http.auth.CredentialsStore;
+import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
+import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
+import org.apache.hc.client5.http.ssl.DefaultHostnameVerifier;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.ssl.SSLContextBuilder;
+import org.apache.hc.core5.ssl.TrustStrategy;
import org.apache.james.util.concurrent.NamedThreadFactory;
-import org.opensearch.client.RestClient;
-import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
-import org.opensearch.client.transport.rest_client.RestClientTransport;
+import org.opensearch.client.transport.OpenSearchTransport;
+import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +64,7 @@
public class ClientProvider implements Provider {
private static class HttpAsyncClientConfigurer {
-
+ private static final AuthScope ANY = new AuthScope(null, null, -1, null, null);
private static final TrustStrategy TRUST_ALL = (x509Certificates, authType) -> true;
private static final HostnameVerifier ACCEPT_ANY_HOSTNAME = (hostname, sslSession) -> true;
@@ -75,9 +79,6 @@ private HttpAsyncClientBuilder configure(HttpAsyncClientBuilder builder) {
configureHostScheme(builder);
configureTimeout(builder);
- configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal);
- configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute);
-
builder.setThreadFactory(NamedThreadFactory.withName("OpenSearch-driver"));
return builder;
@@ -99,19 +100,35 @@ private void configureHostScheme(HttpAsyncClientBuilder builder) {
}
private void configureSSLOptions(HttpAsyncClientBuilder builder) {
- try {
- builder
- .setSSLContext(sslContext())
- .setSSLHostnameVerifier(hostnameVerifier());
- } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) {
- throw new RuntimeException("Cannot set SSL options to the builder", e);
- }
+ builder.setConnectionManager(connectionManager());
}
private void configureTimeout(HttpAsyncClientBuilder builder) {
builder.setDefaultRequestConfig(requestConfig());
}
+ private PoolingAsyncClientConnectionManager connectionManager() {
+ PoolingAsyncClientConnectionManagerBuilder builder = PoolingAsyncClientConnectionManagerBuilder
+ .create()
+ .setTlsStrategy(tlsStrategy());
+
+ configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal);
+ configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute);
+
+ return builder.build();
+ }
+
+ private TlsStrategy tlsStrategy() {
+ try {
+ return ClientTlsStrategyBuilder.create()
+ .setSslContext(sslContext())
+ .setHostnameVerifier(hostnameVerifier())
+ .build();
+ } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) {
+ throw new RuntimeException("Cannot set SSL options to the builder", e);
+ }
+ }
+
private SSLContext sslContext() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException,
CertificateException, IOException {
@@ -152,9 +169,9 @@ private HostnameVerifier hostnameVerifier() {
private RequestConfig requestConfig() {
return RequestConfig.custom()
- .setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
- .setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
- .setSocketTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
+ .setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS)
+ .setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS)
+ .setResponseTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS)
.build();
}
@@ -172,9 +189,9 @@ private SSLContextBuilder applyTrustStore(SSLContextBuilder sslContextBuilder) t
private void configureAuthentication(HttpAsyncClientBuilder builder) {
configuration.getCredential()
.ifPresent(credential -> {
- CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY,
- new UsernamePasswordCredentials(credential.getUsername(), String.valueOf(credential.getPassword())));
+ CredentialsStore credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(ANY,
+ new UsernamePasswordCredentials(credential.getUsername(), credential.getPassword()));
builder.setDefaultCredentialsProvider(credentialsProvider);
});
}
@@ -183,7 +200,7 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class);
private final OpenSearchConfiguration configuration;
- private final RestClient lowLevelRestClient;
+ private final OpenSearchTransport openSearchTransport;
private final OpenSearchAsyncClient openSearchClient;
private final HttpAsyncClientConfigurer httpAsyncClientConfigurer;
private final ReactorOpenSearchClient client;
@@ -192,13 +209,13 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) {
public ClientProvider(OpenSearchConfiguration configuration) {
this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration);
this.configuration = configuration;
- this.lowLevelRestClient = buildRestClient();
+ this.openSearchTransport = buildTransport();
this.openSearchClient = connect();
- this.client = new ReactorOpenSearchClient(this.openSearchClient, lowLevelRestClient);
+ this.client = new ReactorOpenSearchClient(this.openSearchClient);
}
- private RestClient buildRestClient() {
- return RestClient.builder(hostsToHttpHosts())
+ private OpenSearchTransport buildTransport() {
+ return ApacheHttpClient5TransportBuilder.builder(hostsToHttpHosts())
.setHttpClientConfigCallback(httpAsyncClientConfigurer::configure)
.build();
}
@@ -218,14 +235,12 @@ private OpenSearchAsyncClient connect() {
private OpenSearchAsyncClient connectToCluster() {
LOGGER.info("Trying to connect to OpenSearch service at {}", LocalDateTime.now());
- RestClientTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper());
-
- return new OpenSearchAsyncClient(transport);
+ return new OpenSearchAsyncClient(openSearchTransport);
}
private HttpHost[] hostsToHttpHosts() {
return configuration.getHosts().stream()
- .map(host -> new HttpHost(host.getHostName(), host.getPort(), configuration.getHostScheme().name()))
+ .map(host -> new HttpHost(configuration.getHostScheme().name(), host.getHostName(), host.getPort()))
.toArray(HttpHost[]::new);
}
@@ -236,6 +251,6 @@ public ReactorOpenSearchClient get() {
@PreDestroy
public void close() throws IOException {
- lowLevelRestClient.close();
+ openSearchTransport.close();
}
}
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
index 05e41ecebec..4b722d3af2a 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
@@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import org.opensearch.client.RestClient;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.opensearch.cluster.HealthRequest;
import org.opensearch.client.opensearch.cluster.HealthResponse;
@@ -58,11 +57,9 @@
public class ReactorOpenSearchClient implements AutoCloseable {
private final OpenSearchAsyncClient client;
- private final RestClient lowLevelRestClient;
- public ReactorOpenSearchClient(OpenSearchAsyncClient client, RestClient lowLevelRestClient) {
+ public ReactorOpenSearchClient(OpenSearchAsyncClient client) {
this.client = client;
- this.lowLevelRestClient = lowLevelRestClient;
}
public Mono bulk(BulkRequest bulkRequest) throws IOException {
@@ -81,10 +78,6 @@ public Mono deleteByQuery(DeleteByQueryRequest deleteRequ
return toReactor(client.deleteByQuery(deleteRequest));
}
- public RestClient getLowLevelClient() {
- return lowLevelRestClient;
- }
-
public Mono index(IndexRequest indexRequest) throws IOException {
return toReactor(client.index(indexRequest));
}
@@ -127,7 +120,7 @@ public Mono> get(GetRequest getRequest) throws IOExcepti
@Override
public void close() throws IOException {
- lowLevelRestClient.close();
+ client._transport().close();
}
private static Mono toReactor(CompletableFuture async) {