Skip to content

Commit

Permalink
Fix bug where update/upsert bulk action did not filter exclude/includ…
Browse files Browse the repository at this point in the history
…e keys, document_root_key, etc (#3747) (#3801)

Signed-off-by: Taylor Gray <[email protected]>
(cherry picked from commit 48d0d72)

Co-authored-by: Taylor Gray <[email protected]>
  • Loading branch information
1 parent 3bfd4f0 commit eed9c3d
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,72 @@ public void testBulkActionUpdateWithActions() throws IOException, InterruptedExc
sink.shutdown();
}

@Test
public void testBulkActionUpdateWithDocumentRootKey() throws IOException, InterruptedException {
final String testIndexAlias = "test-alias-upd1";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile();

final String testIdField = "someId";
final String testId = "foo";
final String documentRootKey = "root_key";

final String originalValue = "Original";
final String updatedValue = "Updated";


final String createJsonEvent = "{\"" + testIdField + "\": \"" + testId + "\", \"" + documentRootKey + "\": { \"value\": \"" + originalValue + "\"}}";

List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(createJsonEvent));

final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile);

pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ROOT_KEY, documentRootKey);
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
List<Map<String, Object>> aList = new ArrayList<>();
Map<String, Object> actionMap = new HashMap<>();
actionMap.put("type", OpenSearchBulkActions.CREATE.toString());


aList.add(actionMap);
pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList);
OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
assertThat(retSources.size(), equalTo(1));
assertThat(retSources.get(0).containsKey(documentRootKey), equalTo(false));
assertThat((String) retSources.get(0).get("value"), equalTo(originalValue));
assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1)));
sink.shutdown();

// verify metrics
final List<Measurement> bulkRequestLatencies = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(OpenSearchSink.BULKREQUEST_LATENCY).toString());
assertThat(bulkRequestLatencies.size(), equalTo(3));
// COUNT
Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0);

final String updateJsonEvent = "{\"" + testIdField + "\": \"" + testId + "\", \"" + documentRootKey + "\": { \"value\": \"" + updatedValue + "\"}}";

testRecords = Collections.singletonList(jsonStringToRecord(updateJsonEvent));
aList = new ArrayList<>();
actionMap = new HashMap<>();
actionMap.put("type", OpenSearchBulkActions.UPDATE.toString());
aList.add(actionMap);
pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList);
sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
retSources = getSearchResponseDocSources(testIndexAlias);

assertThat(retSources.size(), equalTo(1));
Map<String, Object> source = retSources.get(0);
assertThat(source.containsKey(documentRootKey), equalTo(false));
assertThat((String) source.get("value"), equalTo(updatedValue));
assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1)));
sink.shutdown();
}

@Test
public void testBulkActionUpsertWithActions() throws IOException, InterruptedException {
final String testIndexAlias = "test-alias-upd2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.common.opensearch.ServerlessOptionsFactory;
import org.opensearch.dataprepper.plugins.common.opensearch.ServerlessNetworkPolicyUpdater;
import org.opensearch.dataprepper.plugins.common.opensearch.ServerlessNetworkPolicyUpdaterFactory;
import org.opensearch.dataprepper.plugins.common.opensearch.ServerlessOptionsFactory;
import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
Expand Down Expand Up @@ -295,16 +295,28 @@ private BulkOperation getBulkOperationForAction(final String action,
}
if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) ||
StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString())) {

JsonNode filteredJsonNode = jsonNode;
try {
if (isUsingDocumentFilters()) {
filteredJsonNode = objectMapper.reader().readTree(document.getSerializedJson());
}
} catch (final IOException e) {
throw new RuntimeException(
String.format("An exception occurred while deserializing a document for the %s action: %s", action, e.getMessage()));
}


final UpdateOperation.Builder<Object> updateOperationBuilder = (action.toLowerCase() == OpenSearchBulkActions.UPSERT.toString()) ?
new UpdateOperation.Builder<>()
.index(indexName)
.document(jsonNode)
.upsert(jsonNode)
.document(filteredJsonNode)
.upsert(filteredJsonNode)
.versionType(versionType)
.version(version) :
new UpdateOperation.Builder<>()
.index(indexName)
.document(jsonNode)
.document(filteredJsonNode)
.versionType(versionType)
.version(version);
docId.ifPresent(updateOperationBuilder::id);
Expand Down Expand Up @@ -416,7 +428,15 @@ public void doOutput(final Collection<Record<Event>> records) {
StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) {
serializedJsonNode = SerializedJson.fromJsonNode(event.getJsonNode(), document);
}
BulkOperation bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode());
BulkOperation bulkOperation;

try {
bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode());
} catch (final Exception e) {
LOG.error("An exception occurred while constructing the bulk operation for a document: ", e);
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);
continue;
}

BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode);
final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper);
Expand Down Expand Up @@ -583,4 +603,17 @@ private DlqObject createDlqObjectFromEvent(final Event event,
.withPluginId(pluginSetting.getName())
.build();
}

/**
* This function is used for update and upsert bulk actions to determine whether the original JsonNode needs to be filtered down
* based on the user's sink configuration. If a new parameter manipulates the document before sending to OpenSearch, it needs to be added to
* this list to get applied for update and upsert actions
* @return whether the doc
*/
private boolean isUsingDocumentFilters() {
return documentRootKey != null ||
(sinkContext.getIncludeKeys() != null && !sinkContext.getIncludeKeys().isEmpty()) ||
(sinkContext.getExcludeKeys() != null && !sinkContext.getExcludeKeys().isEmpty()) ||
sinkContext.getTagsTargetKey() != null;
}
}

0 comments on commit eed9c3d

Please sign in to comment.