diff --git a/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java b/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java index e4ed9dc019..4c8819f2cd 100644 --- a/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java +++ b/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java @@ -233,29 +233,13 @@ public CompletionStage updateDataObjectAsync( String source = Strings.toString(MediaTypeRegistry.JSON, request.dataObject()); JsonNode jsonNode = OBJECT_MAPPER.readTree(source); Map updateItem = JsonTransformer.convertJsonObjectToDDBAttributeMap(jsonNode); - updateItem.remove(HASH_KEY); + updateItem.remove(TENANT_ID); updateItem.remove(RANGE_KEY); Map updateKey = new HashMap<>(); updateKey.put(HASH_KEY, AttributeValue.builder().s(tenantId).build()); updateKey.put(RANGE_KEY, AttributeValue.builder().s(request.id()).build()); - UpdateItemRequest.Builder updateItemRequestBuilder = UpdateItemRequest.builder().tableName(request.index()).key(updateKey); - Map expressionAttributeNames = new HashMap<>(); - expressionAttributeNames.put("#seqNo", SEQ_NO_KEY); - expressionAttributeNames.put("#source", SOURCE); - Map expressionAttributeValues = new HashMap<>(); - expressionAttributeValues.put(":incr", AttributeValue.builder().n("1").build()); - expressionAttributeValues.put(":source", AttributeValue.builder().m(updateItem).build()); - updateItemRequestBuilder.updateExpression("SET #seqNo = #seqNo + :incr, #source = :source "); - if (request.ifSeqNo() != null) { - // Get current document version and put in attribute map. Ignore primary term on DDB. - updateItemRequestBuilder.conditionExpression("#seqNo = :currentSeqNo"); - expressionAttributeValues.put(":currentSeqNo", AttributeValue.builder().n(Long.toString(request.ifSeqNo())).build()); - } - updateItemRequestBuilder - .expressionAttributeNames(expressionAttributeNames) - .expressionAttributeValues(expressionAttributeValues); - UpdateItemRequest updateItemRequest = updateItemRequestBuilder.build(); - Long sequenceNumber = updateItemWithRetryOnConflict(updateItemRequest, request); + + Long sequenceNumber = updateItemWithRetryOnConflict(updateKey, updateItem, request); String simulatedUpdateResponse = simulateOpenSearchResponse( request.index(), request.id(), @@ -275,10 +259,41 @@ public CompletionStage updateDataObjectAsync( }), executor); } - private Long updateItemWithRetryOnConflict(UpdateItemRequest updateItemRequest, UpdateDataObjectRequest request) { + private Long updateItemWithRetryOnConflict( + Map updateKey, + Map updateItem, + UpdateDataObjectRequest request + ) { int retriesRemaining = request.retryOnConflict(); do { try { + UpdateItemRequest.Builder updateItemRequestBuilder = UpdateItemRequest.builder().tableName(request.index()).key(updateKey); + updateItemRequestBuilder.updateExpression("SET #seqNo = #seqNo + :incr, #source = :source "); + // Get current document version and put in attribute map. Ignore primary term on DDB. + updateItemRequestBuilder.conditionExpression("#seqNo = :currentSeqNo"); + Map expressionAttributeNames = new HashMap<>(); + expressionAttributeNames.put("#seqNo", SEQ_NO_KEY); + expressionAttributeNames.put("#source", SOURCE); + Map expressionAttributeValues = new HashMap<>(); + expressionAttributeValues.put(":incr", AttributeValue.builder().n("1").build()); + // Fetch current item and extract data object + Map currentItem = dynamoDbClient + .getItem(GetItemRequest.builder().tableName(request.index()).key(updateKey).build()) + .item(); + Map dataObject = new HashMap<>(currentItem.get(SOURCE).m()); + // Update existing with changes + dataObject.putAll(updateItem); + expressionAttributeValues.put(":source", AttributeValue.builder().m(dataObject).build()); + // Use seqNo from the object we got to make sure we're updating the same thing + if (request.ifSeqNo() != null) { + expressionAttributeValues.put(":currentSeqNo", AttributeValue.builder().n(Long.toString(request.ifSeqNo())).build()); + } else { + expressionAttributeValues.put(":currentSeqNo", currentItem.get(SEQ_NO_KEY)); + } + updateItemRequestBuilder + .expressionAttributeNames(expressionAttributeNames) + .expressionAttributeValues(expressionAttributeValues); + UpdateItemRequest updateItemRequest = updateItemRequestBuilder.build(); UpdateItemResponse updateItemResponse = dynamoDbClient.updateItem(updateItemRequest); if (updateItemResponse != null && updateItemResponse.attributes() != null