Skip to content

Commit

Permalink
Add test classes
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jul 3, 2024
1 parent 6df5770 commit 1cf4c43
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
*/
package org.opensearch.sdk;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRe
item.put(RANGE_KEY, AttributeValue.builder().s(id).build());
final PutItemRequest putItemRequest = PutItemRequest.builder().tableName(tableName).item(item).build();

dynamoDbClient.putItem(putItemRequest);
// TODO need to initialize/return SEQ_NO here
// If document doesn't exist, return 0
// If document exists, overwrite and increment and return SEQ_NO
dynamoDbClient.putItem(putItemRequest);
// TODO need to pass seqNo to simulated response
String simulatedIndexResponse = simulateOpenSearchResponse(
request.index(),
request.id(),
Expand Down Expand Up @@ -208,14 +209,14 @@ public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDat
);
}
UpdateItemRequest updateItemRequest = updateItemRequestBuilder.build();
dynamoDbClient.updateItem(updateItemRequest);
// TODO need to add an incremented seqNo here
// also integrate with put and delete. Can we fetch from the updateItemResponse returned on previous line?
dynamoDbClient.updateItem(updateItemRequest);
// TODO need to pass seqNo to simulated response
String simulatedUpdateResponse = simulateOpenSearchResponse(request.index(), request.id(), source, Map.of("found", true));
return new UpdateDataObjectResponse.Builder().id(request.id()).parser(createParser(simulatedUpdateResponse)).build();
} catch (ConditionalCheckFailedException ccfe) {
log.error("Document version conflict updating {} in {}: {}", request.id(), request.index(), ccfe.getMessage(), ccfe);
// Rethrow unchecked exception on update IOException
// Rethrow
throw new OpenSearchStatusException(
"Document version conflict updating " + request.id() + " in index " + request.index(),
RestStatus.CONFLICT
Expand Down Expand Up @@ -251,11 +252,12 @@ public CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDat
.build();
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<DeleteDataObjectResponse>) () -> {
try {
dynamoDbClient.deleteItem(deleteItemRequest);
// TODO need to return SEQ_NO here
// If document doesn't exist, increment and return highest seq no ever seen, but we would have to track seqNo here
// If document never existed, return -2 (unassigned) for seq no (probably what we have to do here)
// If document exists, increment and return SEQ_NO
dynamoDbClient.deleteItem(deleteItemRequest);
// TODO need to pass seqNo to simulated response
String simulatedDeleteResponse = simulateOpenSearchResponse(
request.index(),
request.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sdk.DeleteDataObjectRequest;
import org.opensearch.sdk.DeleteDataObjectResponse;
import org.opensearch.sdk.GetDataObjectRequest;
Expand Down Expand Up @@ -146,6 +147,13 @@ public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDat
}
log.info("Update status for id {}: {}", updateResponse.getId(), updateResponse.getResult());
return new UpdateDataObjectResponse.Builder().id(updateResponse.getId()).parser(createParser(updateResponse)).build();
} catch (VersionConflictEngineException vcee) {
log.error("Document version conflict updating {} in {}: {}", request.id(), request.index(), vcee.getMessage(), vcee);
// Rethrow
throw new OpenSearchStatusException(
"Document version conflict updating " + request.id() + " in index " + request.index(),
RestStatus.CONFLICT
);
} catch (IOException e) {
// Rethrow unchecked exception on XContent parsing error
throw new OpenSearchStatusException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.JsonpSerializable;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.core.DeleteRequest;
import org.opensearch.client.opensearch.core.DeleteResponse;
import org.opensearch.client.opensearch.core.GetRequest;
Expand Down Expand Up @@ -147,6 +148,14 @@ public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDat
UpdateResponse<Map<String, Object>> updateResponse = openSearchClient.update(updateRequest, MAP_DOCTYPE);
log.info("Update status for id {}: {}", updateResponse.id(), updateResponse.result());
return new UpdateDataObjectResponse.Builder().id(updateResponse.id()).parser(createParser(updateResponse)).build();
} catch (OpenSearchException ose) {
String errorType = ose.status() == RestStatus.CONFLICT.getStatus() ? "Document Version Conflict" : "Failed";
log.error("{} updating {} in {}: {}", errorType, request.id(), request.index(), ose.getMessage(), ose);
// Rethrow
throw new OpenSearchStatusException(
errorType + " updating " + request.id() + " in index " + request.index(),
RestStatus.fromCode(ose.status())
);
} catch (IOException e) {
log.error("Error updating {} in {}: {}", request.id(), request.index(), e.getMessage(), e);
// Rethrow unchecked exception on update IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.opensearch.ml.sdkclient;

import static org.mockito.Mockito.when;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.ml.plugin.MachineLearningPlugin.GENERAL_THREAD_POOL;
import static org.opensearch.ml.plugin.MachineLearningPlugin.ML_THREAD_POOL_PREFIX;
Expand All @@ -30,6 +31,7 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetResponse;
Expand All @@ -41,6 +43,7 @@
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
Expand All @@ -65,6 +68,7 @@

import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
Expand All @@ -76,6 +80,9 @@

public class DDBOpenSearchClientTests extends OpenSearchTestCase {

private static final String HASH_KEY = "_tenant_id";
private static final String RANGE_KEY = "_id";

private static final String TEST_ID = "123";
private static final String TENANT_ID = "TEST_TENANT_ID";
private static final String TEST_INDEX = "test_index";
Expand Down Expand Up @@ -145,8 +152,8 @@ public void testPutDataObject_HappyCase() throws IOException {

PutItemRequest putItemRequest = putItemRequestArgumentCaptor.getValue();
Assert.assertEquals(TEST_INDEX, putItemRequest.tableName());
Assert.assertEquals(TEST_ID, putItemRequest.item().get("id").s());
Assert.assertEquals(TENANT_ID, putItemRequest.item().get("tenant_id").s());
Assert.assertEquals(TEST_ID, putItemRequest.item().get(RANGE_KEY).s());
Assert.assertEquals(TENANT_ID, putItemRequest.item().get(HASH_KEY).s());
Assert.assertEquals("foo", putItemRequest.item().get("data").s());
}

Expand Down Expand Up @@ -194,7 +201,7 @@ public void testPutDataObject_NullTenantId_SetsDefaultTenantId() throws IOExcept
Mockito.verify(dynamoDbClient).putItem(putItemRequestArgumentCaptor.capture());

PutItemRequest putItemRequest = putItemRequestArgumentCaptor.getValue();
Assert.assertEquals("DEFAULT_TENANT", putItemRequest.item().get("tenant_id").s());
Assert.assertEquals("DEFAULT_TENANT", putItemRequest.item().get(HASH_KEY).s());
}

@Test
Expand All @@ -208,7 +215,7 @@ public void testPutDataObject_NullId_SetsDefaultTenantId() throws IOException {
Mockito.verify(dynamoDbClient).putItem(putItemRequestArgumentCaptor.capture());

PutItemRequest putItemRequest = putItemRequestArgumentCaptor.getValue();
Assert.assertNotNull(putItemRequest.item().get("id").s());
Assert.assertNotNull(putItemRequest.item().get(RANGE_KEY).s());
Assert.assertNotNull(response.id());
}

Expand Down Expand Up @@ -243,8 +250,8 @@ public void testGetDataObject_HappyCase() throws IOException {
Mockito.verify(dynamoDbClient).getItem(getItemRequestArgumentCaptor.capture());
GetItemRequest getItemRequest = getItemRequestArgumentCaptor.getValue();
Assert.assertEquals(TEST_INDEX, getItemRequest.tableName());
Assert.assertEquals(TENANT_ID, getItemRequest.key().get("tenant_id").s());
Assert.assertEquals(TEST_ID, getItemRequest.key().get("id").s());
Assert.assertEquals(TENANT_ID, getItemRequest.key().get(HASH_KEY).s());
Assert.assertEquals(TEST_ID, getItemRequest.key().get(RANGE_KEY).s());
Assert.assertEquals(TEST_ID, response.id());
Assert.assertEquals("foo", response.source().get("data"));
XContentParser parser = response.parser();
Expand Down Expand Up @@ -325,7 +332,7 @@ public void testGetDataObject_UseDefaultTenantIdIfNull() throws IOException {
sdkClient.getDataObjectAsync(getRequest, testThreadPool.executor(GENERAL_THREAD_POOL)).toCompletableFuture().join();
Mockito.verify(dynamoDbClient).getItem(getItemRequestArgumentCaptor.capture());
GetItemRequest getItemRequest = getItemRequestArgumentCaptor.getValue();
Assert.assertEquals("DEFAULT_TENANT", getItemRequest.key().get("tenant_id").s());
Assert.assertEquals("DEFAULT_TENANT", getItemRequest.key().get(HASH_KEY).s());
}

@Test
Expand Down Expand Up @@ -353,8 +360,8 @@ public void testDeleteDataObject_HappyCase() throws IOException {
.join();
DeleteItemRequest deleteItemRequest = deleteItemRequestArgumentCaptor.getValue();
Assert.assertEquals(TEST_INDEX, deleteItemRequest.tableName());
Assert.assertEquals(TENANT_ID, deleteItemRequest.key().get("tenant_id").s());
Assert.assertEquals(TEST_ID, deleteItemRequest.key().get("id").s());
Assert.assertEquals(TENANT_ID, deleteItemRequest.key().get(HASH_KEY).s());
Assert.assertEquals(TEST_ID, deleteItemRequest.key().get(RANGE_KEY).s());
Assert.assertEquals(TEST_ID, deleteResponse.id());

DeleteResponse deleteActionResponse = DeleteResponse.fromXContent(deleteResponse.parser());
Expand All @@ -371,7 +378,7 @@ public void testDeleteDataObject_NullTenantId_UsesDefaultTenantId() {
Mockito.when(dynamoDbClient.deleteItem(deleteItemRequestArgumentCaptor.capture())).thenReturn(DeleteItemResponse.builder().build());
sdkClient.deleteDataObjectAsync(deleteRequest, testThreadPool.executor(GENERAL_THREAD_POOL)).toCompletableFuture().join();
DeleteItemRequest deleteItemRequest = deleteItemRequestArgumentCaptor.getValue();
Assert.assertEquals("DEFAULT_TENANT", deleteItemRequest.key().get("tenant_id").s());
Assert.assertEquals("DEFAULT_TENANT", deleteItemRequest.key().get(HASH_KEY).s());
}

@Test
Expand All @@ -391,8 +398,8 @@ public void updateDataObjectAsync_HappyCase() {
UpdateItemRequest updateItemRequest = updateItemRequestArgumentCaptor.getValue();
assertEquals(TEST_ID, updateRequest.id());
assertEquals(TEST_INDEX, updateItemRequest.tableName());
assertEquals(TEST_ID, updateItemRequest.key().get("id").s());
assertEquals(TENANT_ID, updateItemRequest.key().get("tenant_id").s());
assertEquals(TEST_ID, updateItemRequest.key().get(RANGE_KEY).s());
assertEquals(TENANT_ID, updateItemRequest.key().get(HASH_KEY).s());
assertEquals("foo", updateItemRequest.key().get("data").s());

}
Expand All @@ -414,8 +421,8 @@ public void updateDataObjectAsync_HappyCaseWithMap() {
UpdateItemRequest updateItemRequest = updateItemRequestArgumentCaptor.getValue();
assertEquals(TEST_ID, updateRequest.id());
assertEquals(TEST_INDEX, updateItemRequest.tableName());
assertEquals(TEST_ID, updateItemRequest.key().get("id").s());
assertEquals(TENANT_ID, updateItemRequest.key().get("tenant_id").s());
assertEquals(TEST_ID, updateItemRequest.key().get(RANGE_KEY).s());
assertEquals(TENANT_ID, updateItemRequest.key().get(HASH_KEY).s());
assertEquals("bar", updateItemRequest.key().get("foo").s());

}
Expand All @@ -431,7 +438,29 @@ public void updateDataObjectAsync_NullTenantId_UsesDefaultTenantId() {
Mockito.when(dynamoDbClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenReturn(UpdateItemResponse.builder().build());
sdkClient.updateDataObjectAsync(updateRequest, testThreadPool.executor(GENERAL_THREAD_POOL)).toCompletableFuture().join();
UpdateItemRequest updateItemRequest = updateItemRequestArgumentCaptor.getValue();
assertEquals(TENANT_ID, updateItemRequest.key().get("tenant_id").s());
assertEquals(TENANT_ID, updateItemRequest.key().get(HASH_KEY).s());
}

public void testUpdateDataObject_VersionCheck() throws IOException {
UpdateDataObjectRequest updateRequest = new UpdateDataObjectRequest.Builder()
.index(TEST_INDEX)
.id(TEST_ID)
.dataObject(testDataObject)
.ifSeqNo(5)
.ifPrimaryTerm(2)
.build();

ConditionalCheckFailedException conflictException = ConditionalCheckFailedException.builder().build();
when(dynamoDbClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenThrow(conflictException);

CompletableFuture<UpdateDataObjectResponse> future = sdkClient
.updateDataObjectAsync(updateRequest, testThreadPool.executor(GENERAL_THREAD_POOL))
.toCompletableFuture();

CompletionException ce = assertThrows(CompletionException.class, () -> future.join());
Throwable cause = ce.getCause();
assertEquals(OpenSearchStatusException.class, cause.getClass());
assertEquals(RestStatus.CONFLICT, ((OpenSearchStatusException) cause).status());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.get.GetResult;
import org.opensearch.sdk.DeleteDataObjectRequest;
import org.opensearch.sdk.DeleteDataObjectResponse;
Expand Down Expand Up @@ -423,6 +424,33 @@ public void testUpdateDataObject_Exception() throws IOException {
assertEquals("test", cause.getMessage());
}

public void testUpdateDataObject_VersionCheck() throws IOException {
UpdateDataObjectRequest updateRequest = new UpdateDataObjectRequest.Builder()
.index(TEST_INDEX)
.id(TEST_ID)
.dataObject(testDataObject)
.ifSeqNo(5)
.ifPrimaryTerm(2)
.build();

ArgumentCaptor<UpdateRequest> updateRequestCaptor = ArgumentCaptor.forClass(UpdateRequest.class);
VersionConflictEngineException conflictException = new VersionConflictEngineException(
new ShardId(TEST_INDEX, "_na_", 0),
TEST_ID,
"test"
);
when(mockedClient.update(updateRequestCaptor.capture())).thenThrow(conflictException);

CompletableFuture<UpdateDataObjectResponse> future = sdkClient
.updateDataObjectAsync(updateRequest, testThreadPool.executor(GENERAL_THREAD_POOL))
.toCompletableFuture();

CompletionException ce = assertThrows(CompletionException.class, () -> future.join());
Throwable cause = ce.getCause();
assertEquals(OpenSearchStatusException.class, cause.getClass());
assertEquals(RestStatus.CONFLICT, ((OpenSearchStatusException) cause).status());
}

public void testDeleteDataObject() throws IOException {
DeleteDataObjectRequest deleteRequest = new DeleteDataObjectRequest.Builder().index(TEST_INDEX).id(TEST_ID).build();

Expand Down
Loading

0 comments on commit 1cf4c43

Please sign in to comment.