Skip to content

Commit

Permalink
Add more parameters to DataObject requests and responses
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Nov 5, 2024
1 parent 7f19078 commit 05a64d1
Show file tree
Hide file tree
Showing 19 changed files with 413 additions and 61 deletions.
19 changes: 19 additions & 0 deletions common/src/main/java/org/opensearch/sdk/BulkDataObjectRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
import java.util.List;
import java.util.Set;

import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.common.Nullable;
import org.opensearch.core.common.Strings;

public class BulkDataObjectRequest {

private final List<DataObjectRequest> requests = new ArrayList<>();
private final Set<String> indices = new HashSet<>();
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
private String globalIndex;
private String globalTenantId;

Expand Down Expand Up @@ -87,6 +89,23 @@ public BulkDataObjectRequest add(DataObjectRequest request) {
return this;
}

/**
* Should this request trigger a refresh ({@linkplain RefreshPolicy#IMMEDIATE}), wait for a refresh (
* {@linkplain RefreshPolicy#WAIT_UNTIL}), or proceed ignore refreshes entirely ({@linkplain RefreshPolicy#NONE}, the default).
*/
public BulkDataObjectRequest setRefreshPolicy(RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
return this;
}

/**
* Should this request trigger a refresh ({@linkplain RefreshPolicy#IMMEDIATE}), wait for a refresh (
* {@linkplain RefreshPolicy#WAIT_UNTIL}), or proceed ignore refreshes entirely ({@linkplain RefreshPolicy#NONE}, the default).
*/
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}

/**
* Instantiate a builder for this object
* @return a builder instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,28 @@

import java.util.Arrays;

import org.opensearch.core.xcontent.XContentParser;

import static org.opensearch.action.bulk.BulkResponse.NO_INGEST_TOOK;

public class BulkDataObjectResponse {

private final DataObjectResponse[] responses;
private final DataObjectResponse[] responses;
private final long tookInMillis;
private final long ingestTookInMillis;
private final boolean failures;
private final XContentParser parser;

public BulkDataObjectResponse(DataObjectResponse[] responses, long tookInMillis) {
this(responses, tookInMillis, NO_INGEST_TOOK);
public BulkDataObjectResponse(DataObjectResponse[] responses, long tookInMillis, boolean failures, XContentParser parser) {
this(responses, tookInMillis, NO_INGEST_TOOK, failures, parser);
}

public BulkDataObjectResponse(DataObjectResponse[] responses, long tookInMillis, long ingestTookInMillis) {
public BulkDataObjectResponse(DataObjectResponse[] responses, long tookInMillis, long ingestTookInMillis, boolean failures, XContentParser parser) {
this.responses = responses;
this.tookInMillis = tookInMillis;
this.ingestTookInMillis = ingestTookInMillis;
this.failures = failures;
this.parser = parser;
}

/**
Expand Down Expand Up @@ -57,6 +63,14 @@ public long getIngestTookInMillis() {
* @return true if any response failed, false otherwise
*/
public boolean hasFailures() {
return Arrays.stream(responses).anyMatch(DataObjectResponse::isFailed);
return this.failures;
}

/**
* Returns the parser
* @return the parser
*/
public XContentParser parser() {
return this.parser;
}
}
71 changes: 69 additions & 2 deletions common/src/main/java/org/opensearch/sdk/DataObjectResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,43 @@
*/
package org.opensearch.sdk;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentParser;

public abstract class DataObjectResponse {
private final String index;
private final String id;
private final XContentParser parser;
private final boolean failed;
private final Exception cause;
private final RestStatus status;

/**
* Instantiate this request with an id and parser representing a Response
* Instantiate this request with an index, id, failure status, and parser representing a Response
* <p>
* For data storage implementations other than OpenSearch, the id may be referred to as a primary key.
* @param index the index
* @param id the document id
* @param parser a parser that can be used to create a Response
* @param failed whether the request failed
* @param cause the Exception causing the failure
* @param status the RestStatus
*/
protected DataObjectResponse(String id, XContentParser parser, boolean failed) {
protected DataObjectResponse(String index, String id, XContentParser parser, boolean failed, Exception cause, RestStatus status) {
this.index = index;
this.id = id;
this.parser = parser;
this.failed = failed;
this.cause = cause;
this.status = status;
}

/**
* Returns the index
* @return the index
*/
public String index() {
return this.index;
}

/**
Expand All @@ -53,19 +71,48 @@ public boolean isFailed() {
return this.failed;
}

/**
* The actual cause of the failure.
* @return the Exception causing the failure
*/
public Exception cause() {
return this.cause;
}

/**
* The rest status.
* @return the rest status.
*/
public RestStatus status() {
return this.status;
}

/**
* Superclass for common fields in subclass builders
*/
public static class Builder<T extends Builder<T>> {
protected String index = null;
protected String id = null;
protected XContentParser parser;
protected boolean failed = false;
protected Exception cause = null;
protected RestStatus status = null;

/**
* Empty constructor to initialize
*/
protected Builder() {}

/**
* Add an index to this builder
* @param index the index to add
* @return the updated builder
*/
public T index(String index) {
this.index = index;
return self();
}

/**
* Add an id to this builder
* @param id the id to add
Expand Down Expand Up @@ -96,6 +143,26 @@ public T failed(boolean failed) {
return self();
}

/**
* Add a cause to this builder
* @param cause the Exception
* @return the updated builder
*/
public T cause(Exception cause) {
this.cause = cause;
return self();
}

/**
* Add a rest status to this builder
* @param status the rest status
* @return the updated builder
*/
public T status(RestStatus status) {
this.status = status;
return self();
}

/**
* Returns this builder as the parameterized type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.sdk;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentParser;

public class DeleteDataObjectResponse extends DataObjectResponse {
Expand All @@ -16,12 +17,15 @@ public class DeleteDataObjectResponse extends DataObjectResponse {
* Instantiate this request with an id and parser representing a DeleteResponse
* <p>
* For data storage implementations other than OpenSearch, the id may be referred to as a primary key.
* @param index the index
* @param id the document id
* @param parser a parser that can be used to create a DeleteResponse
* @param failed whether the request failed
* @param cause the Exception causing the failure
* @param status the RestStatus
*/
public DeleteDataObjectResponse(String id, XContentParser parser, boolean failed) {
super(id, parser, failed);
public DeleteDataObjectResponse(String index, String id, XContentParser parser, boolean failed, Exception cause, RestStatus status) {
super(index, id, parser, failed, cause, status);
}

/**
Expand All @@ -42,7 +46,7 @@ public static class Builder extends DataObjectResponse.Builder<Builder> {
* @return A {@link DeleteDataObjectResponse}
*/
public DeleteDataObjectResponse build() {
return new DeleteDataObjectResponse(this.id, this.parser, this.failed);
return new DeleteDataObjectResponse(this.index, this.id, this.parser, this.failed, this.cause, this.status);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Collections;
import java.util.Map;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentParser;

public class GetDataObjectResponse extends DataObjectResponse {
Expand All @@ -20,13 +21,16 @@ public class GetDataObjectResponse extends DataObjectResponse {
* Instantiate this request with an id and parser/map used to recreate the data object.
* <p>
* For data storage implementations other than OpenSearch, the id may be referred to as a primary key.
* @param index the index
* @param id the document id
* @param parser a parser that can be used to create a GetResponse
* @param failed whether the request failed
* @param cause the Exception causing the failure
* @param status the RestStatus
* @param source the data object as a map
*/
public GetDataObjectResponse(String id, XContentParser parser, boolean failed, Map<String, Object> source) {
super(id, parser, failed);
public GetDataObjectResponse(String index, String id, XContentParser parser, boolean failed, Exception cause, RestStatus status, Map<String, Object> source) {
super(index, id, parser, failed, cause, status);
this.source = source;
}

Expand Down Expand Up @@ -67,7 +71,7 @@ public Builder source(Map<String, Object> source) {
* @return A {@link GetDataObjectResponse}
*/
public GetDataObjectResponse build() {
return new GetDataObjectResponse(this.id, this.parser, this.failed, this.source);
return new GetDataObjectResponse(this.index, this.id, this.parser, this.failed, this.cause, this.status, this.source);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.sdk;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentParser;

public class PutDataObjectResponse extends DataObjectResponse {
Expand All @@ -16,12 +17,15 @@ public class PutDataObjectResponse extends DataObjectResponse {
* Instantiate this request with an id and parser representing an IndexResponse
* <p>
* For data storage implementations other than OpenSearch, the id may be referred to as a primary key.
* @param index the index
* @param id the document id
* @param parser a parser that can be used to create an IndexResponse
* @param failed whether the request failed
* @param cause the Exception causing the failure
* @param status the RestStatus
*/
public PutDataObjectResponse(String id, XContentParser parser, boolean failed) {
super(id, parser, failed);
public PutDataObjectResponse(String index, String id, XContentParser parser, boolean failed, Exception cause, RestStatus status) {
super(index, id, parser, failed, cause, status);
}

/**
Expand All @@ -42,7 +46,7 @@ public static class Builder extends DataObjectResponse.Builder<Builder> {
* @return A {@link PutDataObjectResponse}
*/
public PutDataObjectResponse build() {
return new PutDataObjectResponse(this.id, this.parser, this.failed);
return new PutDataObjectResponse(this.index, this.id, this.parser, this.failed, this.cause, this.status);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.sdk;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentParser;

public class UpdateDataObjectResponse extends DataObjectResponse {
Expand All @@ -16,12 +17,15 @@ public class UpdateDataObjectResponse extends DataObjectResponse {
* Instantiate this request with an id and parser representing an UpdateResponse
* <p>
* For data storage implementations other than OpenSearch, the id may be referred to as a primary key.
* @param index the index
* @param id the document id
* @param parser a parser that can be used to create an UpdateResponse
* @param failed whether the request failed
* @param cause the Exception causing the failure
* @param status the RestStatus
*/
public UpdateDataObjectResponse(String id, XContentParser parser, boolean failed) {
super(id, parser, failed);
public UpdateDataObjectResponse(String index, String id, XContentParser parser, boolean failed, Exception cause, RestStatus status) {
super(index, id, parser, failed, cause, status);
}

/**
Expand All @@ -42,7 +46,7 @@ public static class Builder extends DataObjectResponse.Builder<Builder> {
* @return A {@link UpdateDataObjectResponse}
*/
public UpdateDataObjectResponse build() {
return new UpdateDataObjectResponse(this.id, this.parser, this.failed);
return new UpdateDataObjectResponse(this.index, this.id, this.parser, this.failed, this.cause, this.status);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,13 @@ public CompletionStage<BulkDataObjectResponse> bulkDataObjectAsync(
);
}
}
return new BulkDataObjectResponse(responses, bulkResponse.getTook().millis(), bulkResponse.getIngestTookInMillis());
return new BulkDataObjectResponse(
responses,
bulkResponse.getTook().millis(),
bulkResponse.getIngestTookInMillis(),
bulkResponse.hasFailures(),
createParser(bulkResponse)
);
} catch (IOException e) {
// Rethrow unchecked exception on XContent parsing error
throw new OpenSearchStatusException("Failed to parse data object in a bulk response", RestStatus.INTERNAL_SERVER_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import org.junit.Before;
import org.junit.Test;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -40,10 +41,12 @@ public void testBulkDataObjectRequest() {
.build()
.add(PutDataObjectRequest.builder().index(testIndex).build())
.add(UpdateDataObjectRequest.builder().build())
.add(DeleteDataObjectRequest.builder().index(testIndex).tenantId(testTenantId).build());
.add(DeleteDataObjectRequest.builder().index(testIndex).tenantId(testTenantId).build())
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);

assertEquals(Set.of(testIndex, testGlobalIndex), request.getIndices());
assertEquals(3, request.requests().size());
assertEquals(RefreshPolicy.IMMEDIATE, request.getRefreshPolicy());

DataObjectRequest r0 = request.requests().get(0);
assertTrue(r0 instanceof PutDataObjectRequest);
Expand Down
Loading

0 comments on commit 05a64d1

Please sign in to comment.