diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java index e0e36d9237..fa83819a4d 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java @@ -110,6 +110,18 @@ public interface Event extends Serializable { */ void clear(); + /** + * Merges another Event into the current Event. + * The values from the other Event will overwrite the values in the current Event for all keys in the current Event. + * If the other Event has keys that are not in the current Event, they will be unmodified. + * + * @param other the other Event to merge into this Event + * @throws IllegalArgumentException if the input event is not compatible to merge. + * @throws UnsupportedOperationException if the current Event does not support merging. + * @since 2.11 + */ + void merge(Event other); + /** * Generates a serialized Json string of the entire Event * diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 09a0705e0e..741216becc 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -123,7 +123,6 @@ public static Event fromMessage(String message) { } private JsonNode getInitialJsonNode(final Object data) { - if (data == null) { return mapper.valueToTree(new HashMap<>()); } else if (data instanceof String) { @@ -348,6 +347,23 @@ public void clear() { } } + @Override + public void merge(final Event other) { + if(!(other instanceof JacksonEvent)) + throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent."); + final JacksonEvent otherJacksonEvent = (JacksonEvent) other; + if(!(otherJacksonEvent.jsonNode instanceof ObjectNode)) { + throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent with object data."); + } + final ObjectNode otherObjectNode = (ObjectNode) otherJacksonEvent.jsonNode; + + if(!(jsonNode instanceof ObjectNode)) { + throw new UnsupportedOperationException("Unable to merge the Event. The current Event must have object data."); + } + + ((ObjectNode) jsonNode).setAll(otherObjectNode); + } + @Override public String toJsonString() { return jsonNode.toString(); @@ -355,7 +371,6 @@ public String toJsonString() { @Override public String getAsJsonString(EventKey key) { - JacksonEventKey jacksonEventKey = asJacksonEventKey(key); final JsonNode node = getNode(jacksonEventKey); if (node.isMissingNode()) { diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index d16bc345c8..a1fd74b1e1 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -446,6 +446,52 @@ public void testClear() { assertThat(event.toMap().size(), equalTo(0)); } + @Test + void merge_with_non_JacksonEvent_throws() { + final Event otherEvent = mock(Event.class); + assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent)); + } + + @Test + void merge_with_array_JsonNode_throws() { + final JacksonEvent otherEvent = (JacksonEvent) event; + event = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(List.of(UUID.randomUUID().toString())).build(); + assertThrows(UnsupportedOperationException.class, () -> event.merge(otherEvent)); + } + + @Test + void merge_with_array_JsonNode_in_other_throws() { + Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(List.of(UUID.randomUUID().toString())).build(); + assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent)); + } + + @Test + void merge_sets_all_values() { + final String jsonString = "{\"a\": \"alpha\", \"info\": {\"ids\": {\"id\":\"idx\"}}}"; + event.put("b", "original"); + Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build(); + event.merge(otherEvent); + + assertThat(event.get("b", Object.class), equalTo("original")); + assertThat(event.get("a", Object.class), equalTo("alpha")); + assertThat(event.containsKey("info"), equalTo(true)); + assertThat(event.get("info/ids/id", String.class), equalTo("idx")); + } + + @Test + void merge_overrides_existing_values() { + final String jsonString = "{\"a\": \"alpha\", \"info\": {\"ids\": {\"id\":\"idx\"}}}"; + event.put("a", "original"); + event.put("b", "original"); + Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build(); + event.merge(otherEvent); + + assertThat(event.get("b", Object.class), equalTo("original")); + assertThat(event.get("a", Object.class), equalTo("alpha")); + assertThat(event.containsKey("info"), equalTo(true)); + assertThat(event.get("info/ids/id", String.class), equalTo("idx")); + } + @ParameterizedTest @ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"}) public void testDelete_withNonexistentKey(final String key) {