diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index ace3a35dac..5d8865aca9 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -811,6 +811,72 @@ public void testBulkActionUpdateWithActions() throws IOException, InterruptedExc sink.shutdown(); } + @Test + public void testBulkActionUpdateWithDocumentRootKey() throws IOException, InterruptedException { + final String testIndexAlias = "test-alias-upd1"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); + + final String testIdField = "someId"; + final String testId = "foo"; + final String documentRootKey = "root_key"; + + final String originalValue = "Original"; + final String updatedValue = "Updated"; + + + final String createJsonEvent = "{\"" + testIdField + "\": \"" + testId + "\", \"" + documentRootKey + "\": { \"value\": \"" + originalValue + "\"}}"; + + List> testRecords = Collections.singletonList(jsonStringToRecord(createJsonEvent)); + + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ROOT_KEY, documentRootKey); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + List> aList = new ArrayList<>(); + Map actionMap = new HashMap<>(); + actionMap.put("type", OpenSearchBulkActions.CREATE.toString()); + + + aList.add(actionMap); + pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + List> retSources = getSearchResponseDocSources(testIndexAlias); + assertThat(retSources.size(), equalTo(1)); + assertThat(retSources.get(0).containsKey(documentRootKey), equalTo(false)); + assertThat((String) retSources.get(0).get("value"), equalTo(originalValue)); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // verify metrics + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + + final String updateJsonEvent = "{\"" + testIdField + "\": \"" + testId + "\", \"" + documentRootKey + "\": { \"value\": \"" + updatedValue + "\"}}"; + + testRecords = Collections.singletonList(jsonStringToRecord(updateJsonEvent)); + aList = new ArrayList<>(); + actionMap = new HashMap<>(); + actionMap.put("type", OpenSearchBulkActions.UPDATE.toString()); + aList.add(actionMap); + pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); + sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + retSources = getSearchResponseDocSources(testIndexAlias); + + assertThat(retSources.size(), equalTo(1)); + Map source = retSources.get(0); + assertThat(source.containsKey(documentRootKey), equalTo(false)); + assertThat((String) source.get("value"), equalTo(updatedValue)); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + } + @Test public void testBulkActionUpsertWithActions() throws IOException, InterruptedException { final String testIndexAlias = "test-alias-upd2"; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 159f543f52..9c2352b3e1 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -40,9 +40,9 @@ import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.sink.SinkContext; -import org.opensearch.dataprepper.plugins.common.opensearch.ServerlessOptionsFactory; import org.opensearch.dataprepper.plugins.common.opensearch.ServerlessNetworkPolicyUpdater; import org.opensearch.dataprepper.plugins.common.opensearch.ServerlessNetworkPolicyUpdaterFactory; +import org.opensearch.dataprepper.plugins.common.opensearch.ServerlessOptionsFactory; import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; @@ -295,16 +295,28 @@ private BulkOperation getBulkOperationForAction(final String action, } if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) || StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString())) { + + JsonNode filteredJsonNode = jsonNode; + try { + if (isUsingDocumentFilters()) { + filteredJsonNode = objectMapper.reader().readTree(document.getSerializedJson()); + } + } catch (final IOException e) { + throw new RuntimeException( + String.format("An exception occurred while deserializing a document for the %s action: %s", action, e.getMessage())); + } + + final UpdateOperation.Builder updateOperationBuilder = (action.toLowerCase() == OpenSearchBulkActions.UPSERT.toString()) ? new UpdateOperation.Builder<>() .index(indexName) - .document(jsonNode) - .upsert(jsonNode) + .document(filteredJsonNode) + .upsert(filteredJsonNode) .versionType(versionType) .version(version) : new UpdateOperation.Builder<>() .index(indexName) - .document(jsonNode) + .document(filteredJsonNode) .versionType(versionType) .version(version); docId.ifPresent(updateOperationBuilder::id); @@ -416,7 +428,15 @@ public void doOutput(final Collection> records) { StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) { serializedJsonNode = SerializedJson.fromJsonNode(event.getJsonNode(), document); } - BulkOperation bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode()); + BulkOperation bulkOperation; + + try { + bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode()); + } catch (final Exception e) { + LOG.error("An exception occurred while constructing the bulk operation for a document: ", e); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); + continue; + } BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode); final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper); @@ -583,4 +603,17 @@ private DlqObject createDlqObjectFromEvent(final Event event, .withPluginId(pluginSetting.getName()) .build(); } + + /** + * This function is used for update and upsert bulk actions to determine whether the original JsonNode needs to be filtered down + * based on the user's sink configuration. If a new parameter manipulates the document before sending to OpenSearch, it needs to be added to + * this list to get applied for update and upsert actions + * @return whether the doc + */ + private boolean isUsingDocumentFilters() { + return documentRootKey != null || + (sinkContext.getIncludeKeys() != null && !sinkContext.getIncludeKeys().isEmpty()) || + (sinkContext.getExcludeKeys() != null && !sinkContext.getExcludeKeys().isEmpty()) || + sinkContext.getTagsTargetKey() != null; + } }