From b88ee38582b84e812dbf4906bd51b8d46d8e5d57 Mon Sep 17 00:00:00 2001 From: Rene Cordier Date: Wed, 3 Jul 2024 15:23:59 +0700 Subject: [PATCH 1/2] [Update] opensearch-java 2.10.4 => 2.11.1 --- backends-common/opensearch/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-common/opensearch/pom.xml b/backends-common/opensearch/pom.xml index 0b08d59bb48..5397887d9ed 100644 --- a/backends-common/opensearch/pom.xml +++ b/backends-common/opensearch/pom.xml @@ -83,7 +83,7 @@ org.opensearch.client opensearch-java - 2.10.4 + 2.11.1 org.opensearch.client From ce7c6d8f91ed06bb32ac1652379278dc59ec1af1 Mon Sep 17 00:00:00 2001 From: Rene Cordier Date: Wed, 3 Jul 2024 15:36:34 +0700 Subject: [PATCH 2/2] [Update] Using http5 client instead of opensearch rest client --- backends-common/opensearch/pom.xml | 11 ++- .../backends/opensearch/ClientProvider.java | 95 +++++++++++-------- .../opensearch/ReactorOpenSearchClient.java | 11 +-- 3 files changed, 63 insertions(+), 54 deletions(-) diff --git a/backends-common/opensearch/pom.xml b/backends-common/opensearch/pom.xml index 5397887d9ed..d9d80349dd0 100644 --- a/backends-common/opensearch/pom.xml +++ b/backends-common/opensearch/pom.xml @@ -71,6 +71,12 @@ org.apache.commons commons-configuration2 + + + org.apache.httpcomponents.client5 + httpclient5 + 5.3.1 + org.apache.logging.log4j log4j-to-slf4j @@ -85,11 +91,6 @@ opensearch-java 2.11.1 - - org.opensearch.client - opensearch-rest-client - 2.14.0 - org.slf4j jcl-over-slf4j diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java index bc87d19f36d..6f75b14ad7e 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java @@ -25,6 +25,7 @@ import java.security.cert.CertificateException; import java.time.Duration; import java.time.LocalDateTime; +import java.util.concurrent.TimeUnit; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; @@ -35,21 +36,24 @@ import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.time.DurationFormatUtils; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.conn.ssl.DefaultHostnameVerifier; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; -import org.apache.http.ssl.SSLContextBuilder; -import org.apache.http.ssl.TrustStrategy; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.CredentialsStore; +import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; +import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.client5.http.ssl.DefaultHostnameVerifier; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.ssl.SSLContextBuilder; +import org.apache.hc.core5.ssl.TrustStrategy; import org.apache.james.util.concurrent.NamedThreadFactory; -import org.opensearch.client.RestClient; -import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.opensearch.OpenSearchAsyncClient; -import org.opensearch.client.transport.rest_client.RestClientTransport; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +64,7 @@ public class ClientProvider implements Provider { private static class HttpAsyncClientConfigurer { - + private static final AuthScope ANY = new AuthScope(null, null, -1, null, null); private static final TrustStrategy TRUST_ALL = (x509Certificates, authType) -> true; private static final HostnameVerifier ACCEPT_ANY_HOSTNAME = (hostname, sslSession) -> true; @@ -75,9 +79,6 @@ private HttpAsyncClientBuilder configure(HttpAsyncClientBuilder builder) { configureHostScheme(builder); configureTimeout(builder); - configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal); - configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute); - builder.setThreadFactory(NamedThreadFactory.withName("OpenSearch-driver")); return builder; @@ -99,19 +100,35 @@ private void configureHostScheme(HttpAsyncClientBuilder builder) { } private void configureSSLOptions(HttpAsyncClientBuilder builder) { - try { - builder - .setSSLContext(sslContext()) - .setSSLHostnameVerifier(hostnameVerifier()); - } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) { - throw new RuntimeException("Cannot set SSL options to the builder", e); - } + builder.setConnectionManager(connectionManager()); } private void configureTimeout(HttpAsyncClientBuilder builder) { builder.setDefaultRequestConfig(requestConfig()); } + private PoolingAsyncClientConnectionManager connectionManager() { + PoolingAsyncClientConnectionManagerBuilder builder = PoolingAsyncClientConnectionManagerBuilder + .create() + .setTlsStrategy(tlsStrategy()); + + configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal); + configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute); + + return builder.build(); + } + + private TlsStrategy tlsStrategy() { + try { + return ClientTlsStrategyBuilder.create() + .setSslContext(sslContext()) + .setHostnameVerifier(hostnameVerifier()) + .build(); + } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) { + throw new RuntimeException("Cannot set SSL options to the builder", e); + } + } + private SSLContext sslContext() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException, CertificateException, IOException { @@ -152,9 +169,9 @@ private HostnameVerifier hostnameVerifier() { private RequestConfig requestConfig() { return RequestConfig.custom() - .setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis())) - .setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis())) - .setSocketTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis())) + .setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS) + .setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS) + .setResponseTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS) .build(); } @@ -172,9 +189,9 @@ private SSLContextBuilder applyTrustStore(SSLContextBuilder sslContextBuilder) t private void configureAuthentication(HttpAsyncClientBuilder builder) { configuration.getCredential() .ifPresent(credential -> { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(credential.getUsername(), String.valueOf(credential.getPassword()))); + CredentialsStore credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(ANY, + new UsernamePasswordCredentials(credential.getUsername(), credential.getPassword())); builder.setDefaultCredentialsProvider(credentialsProvider); }); } @@ -183,7 +200,7 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) { private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class); private final OpenSearchConfiguration configuration; - private final RestClient lowLevelRestClient; + private final OpenSearchTransport openSearchTransport; private final OpenSearchAsyncClient openSearchClient; private final HttpAsyncClientConfigurer httpAsyncClientConfigurer; private final ReactorOpenSearchClient client; @@ -192,13 +209,13 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) { public ClientProvider(OpenSearchConfiguration configuration) { this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration); this.configuration = configuration; - this.lowLevelRestClient = buildRestClient(); + this.openSearchTransport = buildTransport(); this.openSearchClient = connect(); - this.client = new ReactorOpenSearchClient(this.openSearchClient, lowLevelRestClient); + this.client = new ReactorOpenSearchClient(this.openSearchClient); } - private RestClient buildRestClient() { - return RestClient.builder(hostsToHttpHosts()) + private OpenSearchTransport buildTransport() { + return ApacheHttpClient5TransportBuilder.builder(hostsToHttpHosts()) .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure) .build(); } @@ -218,14 +235,12 @@ private OpenSearchAsyncClient connect() { private OpenSearchAsyncClient connectToCluster() { LOGGER.info("Trying to connect to OpenSearch service at {}", LocalDateTime.now()); - RestClientTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper()); - - return new OpenSearchAsyncClient(transport); + return new OpenSearchAsyncClient(openSearchTransport); } private HttpHost[] hostsToHttpHosts() { return configuration.getHosts().stream() - .map(host -> new HttpHost(host.getHostName(), host.getPort(), configuration.getHostScheme().name())) + .map(host -> new HttpHost(configuration.getHostScheme().name(), host.getHostName(), host.getPort())) .toArray(HttpHost[]::new); } @@ -236,6 +251,6 @@ public ReactorOpenSearchClient get() { @PreDestroy public void close() throws IOException { - lowLevelRestClient.close(); + openSearchTransport.close(); } } diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java index 05e41ecebec..4b722d3af2a 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; -import org.opensearch.client.RestClient; import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch.cluster.HealthRequest; import org.opensearch.client.opensearch.cluster.HealthResponse; @@ -58,11 +57,9 @@ public class ReactorOpenSearchClient implements AutoCloseable { private final OpenSearchAsyncClient client; - private final RestClient lowLevelRestClient; - public ReactorOpenSearchClient(OpenSearchAsyncClient client, RestClient lowLevelRestClient) { + public ReactorOpenSearchClient(OpenSearchAsyncClient client) { this.client = client; - this.lowLevelRestClient = lowLevelRestClient; } public Mono bulk(BulkRequest bulkRequest) throws IOException { @@ -81,10 +78,6 @@ public Mono deleteByQuery(DeleteByQueryRequest deleteRequ return toReactor(client.deleteByQuery(deleteRequest)); } - public RestClient getLowLevelClient() { - return lowLevelRestClient; - } - public Mono index(IndexRequest indexRequest) throws IOException { return toReactor(client.index(indexRequest)); } @@ -127,7 +120,7 @@ public Mono> get(GetRequest getRequest) throws IOExcepti @Override public void close() throws IOException { - lowLevelRestClient.close(); + client._transport().close(); } private static Mono toReactor(CompletableFuture async) {