diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java index a27906b2cc..04139a1669 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java @@ -65,6 +65,12 @@ public interface Event extends Serializable { */ void delete(String key); + /** + * Delete all keys from the Event + * @since 2.8 + */ + void clear(); + /** * Generates a serialized Json string of the entire Event * diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 62b31ad0c5..642a67c917 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -283,6 +284,16 @@ public void delete(final String key) { } } + @Override + public void clear() { + // Delete all entries from the event + Iterator iter = toMap().keySet().iterator(); + JsonNode baseNode = jsonNode; + while (iter.hasNext()) { + ((ObjectNode) baseNode).remove((String)iter.next()); + } + } + @Override public String toJsonString() { return jsonNode.toString(); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index 99ffd71259..f2ed5b6a12 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -289,6 +289,23 @@ public void testDeleteKey(final String key) { assertThat(result, is(nullValue())); } + @Test + public void testClear() { + event.put("key1", UUID.randomUUID()); + event.put("key2", UUID.randomUUID()); + event.put("key3/key4", UUID.randomUUID()); + event.clear(); + UUID result = event.get("key1", UUID.class); + assertThat(result, is(nullValue())); + result = event.get("key2", UUID.class); + assertThat(result, is(nullValue())); + result = event.get("key3", UUID.class); + assertThat(result, is(nullValue())); + result = event.get("key3/key4", UUID.class); + assertThat(result, is(nullValue())); + assertThat(event.toMap().size(), equalTo(0)); + } + @ParameterizedTest @ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"}) public void testDelete_withNonexistentKey(final String key) { diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessor.java index 7c6ddc3f88..af8de2fe7a 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessor.java @@ -9,23 +9,21 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; @DataPrepperPlugin(name = "select_entries", pluginType = Processor.class, pluginConfigurationType = SelectEntriesProcessorConfig.class) public class SelectEntriesProcessor extends AbstractProcessor, Record> { - private final List entries; + private final List keysToInclude; private final String selectWhen; private final ExpressionEvaluator expressionEvaluator; @@ -33,8 +31,14 @@ public class SelectEntriesProcessor extends AbstractProcessor, Rec @DataPrepperPluginConstructor public SelectEntriesProcessor(final PluginMetrics pluginMetrics, final SelectEntriesProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); - this.entries = Arrays.asList(config.getIncludeKeys()); this.selectWhen = config.getSelectWhen(); + if (selectWhen != null + && !expressionEvaluator.isValidExpressionStatement(selectWhen)) { + throw new InvalidPluginConfigurationException( + String.format("select_when value of %s is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.", selectWhen)); + } + this.keysToInclude = config.getIncludeKeys(); this.expressionEvaluator = expressionEvaluator; } @@ -49,18 +53,13 @@ public Collection> doExecute(final Collection> recor // To handle nested case, just get the values and store // in a temporary map. Map outMap = new HashMap<>(); - for (String entry: entries) { - Object val = recordEvent.get(entry, Object.class); - if (val != null) { - outMap.put(entry, val); + for (String keyToInclude: keysToInclude) { + Object value = recordEvent.get(keyToInclude, Object.class); + if (value != null) { + outMap.put(keyToInclude, value); } } - // Delete all entries from the event - Set keysToDelete = recordEvent.toMap().keySet(); - Iterator iter = keysToDelete.iterator(); - while (iter.hasNext()) { - recordEvent.delete((String)iter.next()); - } + recordEvent.clear(); // add back only the keys selected for (Map.Entry entry: outMap.entrySet()) { diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorConfig.java index 0e8b1a428c..e19723f20d 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorConfig.java @@ -9,16 +9,18 @@ import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; +import java.util.List; + public class SelectEntriesProcessorConfig { @NotEmpty @NotNull @JsonProperty("include_keys") - private String[] includeKeys; + private List includeKeys; @JsonProperty("select_when") private String selectWhen; - public String[] getIncludeKeys() { + public List getIncludeKeys() { return includeKeys; } diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorTests.java index 9f4da55a8a..048f304582 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorTests.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorTests.java @@ -10,10 +10,12 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.Collections; import java.util.HashMap; @@ -38,8 +40,8 @@ public class SelectEntriesProcessorTests { private ExpressionEvaluator expressionEvaluator; @Test - public void testSelectEntriesProcessorTest() { - when(mockConfig.getIncludeKeys()).thenReturn(new String[] { "key1", "key2"}); + public void testSelectEntriesProcessor() { + when(mockConfig.getIncludeKeys()).thenReturn(List.of("key1", "key2")); when(mockConfig.getSelectWhen()).thenReturn(null); final SelectEntriesProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -58,8 +60,8 @@ public void testSelectEntriesProcessorTest() { } @Test - public void testWithKeyDneSelectEntriesProcessorTest() { - when(mockConfig.getIncludeKeys()).thenReturn(new String[] { "key1", "key2"}); + public void testWithKeyDneSelectEntriesProcessor() { + when(mockConfig.getIncludeKeys()).thenReturn(List.of("key1", "key2")); when(mockConfig.getSelectWhen()).thenReturn(null); final SelectEntriesProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -72,9 +74,19 @@ public void testWithKeyDneSelectEntriesProcessorTest() { } @Test - public void testSelectEntriesProcessorWithConditionTest() { - when(mockConfig.getIncludeKeys()).thenReturn(new String[] { "key1", "key2"}); - final String selectWhen = UUID.randomUUID().toString(); + public void testSelectEntriesProcessorWithInvalidCondition() { + final String selectWhen = "/message == \""+UUID.randomUUID().toString()+"\""; + when(expressionEvaluator.isValidExpressionStatement(selectWhen)).thenReturn(false); + when(mockConfig.getSelectWhen()).thenReturn(selectWhen); + assertThrows(InvalidPluginConfigurationException.class, () -> createObjectUnderTest()); + final Record record = getEvent("thisisamessage"); + } + + @Test + public void testSelectEntriesProcessorWithCondition() { + when(mockConfig.getIncludeKeys()).thenReturn(List.of("key1", "key2")); + final String selectWhen = "/message == \""+UUID.randomUUID().toString()+"\""; + when(expressionEvaluator.isValidExpressionStatement(selectWhen)).thenReturn(true); when(mockConfig.getSelectWhen()).thenReturn(selectWhen); final SelectEntriesProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -94,8 +106,8 @@ public void testSelectEntriesProcessorWithConditionTest() { } @Test - public void testNestedSelectEntriesProcessorTest() { - when(mockConfig.getIncludeKeys()).thenReturn(new String[] { "nested/key1", "nested/nested2/key2"}); + public void testNestedSelectEntriesProcessor() { + when(mockConfig.getIncludeKeys()).thenReturn(List.of("nested/key1", "nested/nested2/key2")); when(mockConfig.getSelectWhen()).thenReturn(null); final String value1 = UUID.randomUUID().toString(); final String value2 = UUID.randomUUID().toString();