Skip to content

Commit

Permalink
Refactor clients with supplyAsync, handle not found on get
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed May 17, 2024
1 parent b1ac61d commit 7d03c1e
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 215 deletions.
22 changes: 1 addition & 21 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,10 @@ dependencies {
implementation group: 'org.apache.commons', name: 'commons-text', version: '1.10.0'

implementation "org.opensearch.client:opensearch-java:2.10.2"
implementation 'software.amazon.awssdk:apache-client:2.25.50'
implementation 'software.amazon.awssdk:http-client-spi:2.25.50'
implementation 'software.amazon.awssdk:regions:2.25.50'
implementation 'software.amazon.awssdk:utils:2.25.50'


checkstyle "com.puppycrawl.tools:checkstyle:${project.checkstyle.toolVersion}"

configurations.all {
resolutionStrategy.force 'software.amazon.awssdk:apache-client:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:http-client-spi:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:regions:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:utils:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:metrics-spi:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:annotations:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:sdk-core:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:profiles:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:json-utils:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:http-auth-spi:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:http-auth-aws:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:identity-spi:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:checksums-spi:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:checksums:2.25.50'
resolutionStrategy.force 'software.amazon.awssdk:third-party-jackson-core:2.25.50'

resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5:5.2.4'
resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.4'
resolutionStrategy.force 'jakarta.json:jakarta.json-api:2.1.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,13 @@ private void deleteConnector(DeleteRequest deleteRequest, String connectorId, Ac
if (throwable != null) {
actionListener.onFailure(new RuntimeException(throwable));
} else {
context.restore();
log.info("Connector deletion result: {}, connector id: {}", r.deleted(), r.id());
DeleteResponse response = new DeleteResponse(r.shardId(), r.id(), 0, 0, 0, r.deleted());
actionListener.onResponse(response);
}
});
} catch (Exception e) {
log.error("Failed to delete ML connector: " + connectorId, e);
log.error("Failed to delete ML connector: {}", connectorId, e);
actionListener.onFailure(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@
import static org.opensearch.common.xcontent.json.JsonXContent.jsonXContent;
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
Expand All @@ -36,9 +42,12 @@
import org.opensearch.sdk.PutDataObjectResponse;
import org.opensearch.sdk.SdkClient;

import lombok.extern.log4j.Log4j2;

/**
* An implementation of {@link SdkClient} that stores data in a local OpenSearch cluster using the Node Client.
*/
@Log4j2
public class LocalClusterIndicesClient implements SdkClient {

private final Client client;
Expand All @@ -56,69 +65,60 @@ public LocalClusterIndicesClient(Client client, NamedXContentRegistry xContentRe

@Override
public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRequest request) {
CompletableFuture<PutDataObjectResponse> future = new CompletableFuture<>();
try (XContentBuilder sourceBuilder = XContentFactory.jsonBuilder()) {
client
.index(
new IndexRequest(request.index())
.setRefreshPolicy(IMMEDIATE)
.source(request.dataObject().toXContent(sourceBuilder, EMPTY_PARAMS)),
ActionListener
.wrap(
r -> future
.complete(new PutDataObjectResponse.Builder().id(r.getId()).created(r.getResult() == CREATED).build()),
future::completeExceptionally
)
);
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<PutDataObjectResponse>) () -> {
try (XContentBuilder sourceBuilder = XContentFactory.jsonBuilder()) {
log.info("Indexing data object in {}", request.index());
IndexResponse indexResponse = client
.index(
new IndexRequest(request.index())
.setRefreshPolicy(IMMEDIATE)
.source(request.dataObject().toXContent(sourceBuilder, EMPTY_PARAMS))
)
.actionGet();
log.info("Creation status for id {}: {}", indexResponse.getId(), indexResponse.getResult());
return new PutDataObjectResponse.Builder().id(indexResponse.getId()).created(indexResponse.getResult() == CREATED).build();
} catch (Exception e) {
throw new OpenSearchException(e);
}
}));
}

@Override
public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRequest request) {
CompletableFuture<GetDataObjectResponse> future = new CompletableFuture<>();
try {
client.get(new GetRequest(request.index(), request.id()), ActionListener.wrap(r -> {
try {
XContentParser parser = jsonXContent
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, r.getSourceAsString());
future.complete(new GetDataObjectResponse.Builder().id(r.getId()).parser(parser).build());
} catch (IOException e) {
// Parsing error
future.completeExceptionally(e);
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<GetDataObjectResponse>) () -> {
try {
log.info("Getting {} from {}", request.id(), request.index());
GetResponse getResponse = client.get(new GetRequest(request.index(), request.id())).actionGet();
if (!getResponse.isExists()) {
throw new OpenSearchStatusException("Data object with id " + request.id() + " not found", RestStatus.NOT_FOUND);
}
}, future::completeExceptionally));
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
XContentParser parser = jsonXContent
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString());
log.info("Retrieved data object");
return new GetDataObjectResponse.Builder().id(getResponse.getId()).parser(parser).build();
} catch (OpenSearchStatusException notFound) {
throw notFound;
} catch (Exception e) {
throw new OpenSearchException(e);
}
}));
}

@Override
public CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDataObjectRequest request) {
CompletableFuture<DeleteDataObjectResponse> future = new CompletableFuture<>();
try {
client
.delete(
new DeleteRequest(request.index(), request.id()),
ActionListener
.wrap(
r -> future
.complete(
new DeleteDataObjectResponse.Builder()
.id(r.getId())
.shardId(r.getShardId())
.deleted(r.getResult() == DELETED)
.build()
),
future::completeExceptionally
)
);
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<DeleteDataObjectResponse>) () -> {
try {
log.info("Deleting {} from {}", request.id(), request.index());
DeleteResponse deleteResponse = client.delete(new DeleteRequest(request.index(), request.id())).actionGet();
log.info("Deletion status for id {}: {}", deleteResponse.getId(), deleteResponse.getResult());
return new DeleteDataObjectResponse.Builder()
.id(deleteResponse.getId())
.shardId(deleteResponse.getShardId())
.deleted(deleteResponse.getResult() == DELETED)
.build();
} catch (Exception e) {
throw new OpenSearchException(e);
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.DeleteRequest;
import org.opensearch.client.opensearch.core.DeleteResponse;
Expand All @@ -26,6 +28,7 @@
import org.opensearch.client.opensearch.core.IndexResponse;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.sdk.DeleteDataObjectRequest;
Expand All @@ -36,9 +39,14 @@
import org.opensearch.sdk.PutDataObjectResponse;
import org.opensearch.sdk.SdkClient;

import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.extern.log4j.Log4j2;

/**
* An implementation of {@link SdkClient} that stores data in a remote OpenSearch cluster using the OpenSearch Java Client.
*/
@Log4j2
public class RemoteClusterIndicesClient implements SdkClient {

private OpenSearchClient openSearchClient;
Expand All @@ -53,59 +61,55 @@ public RemoteClusterIndicesClient(OpenSearchClient openSearchClient) {

@Override
public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRequest request) {
CompletableFuture<PutDataObjectResponse> future = new CompletableFuture<>();
IndexRequest<?> indexRequest = new IndexRequest.Builder<>().index(request.index()).document(request.dataObject()).build();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<PutDataObjectResponse>) () -> {
try {
IndexRequest<?> indexRequest = new IndexRequest.Builder<>().index(request.index()).document(request.dataObject()).build();
log.info("Indexing data object in {}", request.index());
IndexResponse indexResponse = openSearchClient.index(indexRequest);
future
.complete(
new PutDataObjectResponse.Builder().id(indexResponse.id()).created(indexResponse.result() == Created).build()
);
log.info("Creation status for id {}: {}", indexResponse.id(), indexResponse.result());
return new PutDataObjectResponse.Builder().id(indexResponse.id()).created(indexResponse.result() == Created).build();
} catch (Exception e) {
future.completeExceptionally(e);
throw new OpenSearchException("Error occurred while indexing data object", e);
}
return null;
});
return future;
}));
}

@Override
public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRequest request) {
CompletableFuture<GetDataObjectResponse> future = new CompletableFuture<>();
GetRequest getRequest = new GetRequest.Builder().index(request.index()).id(request.id()).build();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<GetDataObjectResponse>) () -> {
try {
GetRequest getRequest = new GetRequest.Builder().index(request.index()).id(request.id()).build();
log.info("Getting {} from {}", request.id(), request.index());
@SuppressWarnings("rawtypes")
GetResponse<Map> getResponse = openSearchClient.get(getRequest, Map.class);
String source = getResponse.fields().get("_source").toJson().toString();
if (!getResponse.found()) {
throw new OpenSearchStatusException("Data object with id " + request.id() + " not found", RestStatus.NOT_FOUND);
}
String json = new ObjectMapper().writeValueAsString(getResponse.source());
log.info("Retrieved data object");
XContentParser parser = JsonXContent.jsonXContent
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source);
future.complete(new GetDataObjectResponse.Builder().id(getResponse.id()).parser(parser).build());
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json);
return new GetDataObjectResponse.Builder().id(getResponse.id()).parser(parser).build();
} catch (OpenSearchStatusException notFound) {
throw notFound;
} catch (Exception e) {
future.completeExceptionally(e);
throw new OpenSearchException("Error occurred while getting data object", e);
}
return null;
});
return future;
}));
}

@Override
public CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDataObjectRequest request) {
CompletableFuture<DeleteDataObjectResponse> future = new CompletableFuture<>();
DeleteRequest deleteRequest = new DeleteRequest.Builder().index(request.index()).id(request.id()).build();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<DeleteDataObjectResponse>) () -> {
try {
DeleteRequest deleteRequest = new DeleteRequest.Builder().index(request.index()).id(request.id()).build();
log.info("Deleting {} from {}", request.id(), request.index());
DeleteResponse deleteResponse = openSearchClient.delete(deleteRequest);
future
.complete(
new DeleteDataObjectResponse.Builder().id(deleteResponse.id()).deleted(deleteResponse.result() == Deleted).build()
);
log.info("Deletion status for id {}: {}", deleteResponse.id(), deleteResponse.result());
return new DeleteDataObjectResponse.Builder().id(deleteResponse.id()).deleted(deleteResponse.result() == Deleted).build();
} catch (Exception e) {
future.completeExceptionally(e);
throw new OpenSearchException("Error occurred while deleting data object", e);
}
return null;
});
return future;
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@
*/
package org.opensearch.ml.sdkclient;

import org.apache.http.HttpHost;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.opensearch.OpenSearchException;
import org.opensearch.client.RestClient;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.aws.AwsSdk2Transport;
import org.opensearch.client.transport.aws.AwsSdk2TransportOptions;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.core.common.Strings;
import org.opensearch.sdk.SdkClient;

import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.Region;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* A module for binding this plugin's desired implementation of {@link SdkClient}.
Expand All @@ -29,7 +30,7 @@ public class SdkClientModule extends AbstractModule {
public static final String REGION = "REGION";

private final String remoteMetadataEndpoint;
private final String region;
private final String region; // not using with RestClient

/**
* Instantiate this module using environment variables
Expand Down Expand Up @@ -59,11 +60,21 @@ protected void configure() {
}

private OpenSearchClient createOpenSearchClient() {
SdkHttpClient httpClient = ApacheHttpClient.builder().build();
try {
return new OpenSearchClient(
new AwsSdk2Transport(httpClient, remoteMetadataEndpoint, Region.of(region), AwsSdk2TransportOptions.builder().build())
);
// Basic http(not-s) client using RestClient.
RestClient restClient = RestClient
// This HttpHost syntax works with export REMOTE_METADATA_ENDPOINT=http://127.0.0.1:9200
.builder(HttpHost.create(remoteMetadataEndpoint))
.setStrictDeprecationMode(true)
.setHttpClientConfigCallback(httpClientBuilder -> {
try {
return httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
} catch (Exception e) {
throw new OpenSearchException(e);
}
})
.build();
return new OpenSearchClient(new RestClientTransport(restClient, new JacksonJsonpMapper(new ObjectMapper())));
} catch (Exception e) {
throw new OpenSearchException(e);
}
Expand Down
Loading

0 comments on commit 7d03c1e

Please sign in to comment.