Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Feb 19, 2024
1 parent da45284 commit 37cb242
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public interface Event extends Serializable {
*/
void delete(String key);

/**
* Delete all keys from the Event
* @since 2.8
*/
void clear();

/**
* Generates a serialized Json string of the entire Event
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -283,6 +284,16 @@ public void delete(final String key) {
}
}

@Override
public void clear() {
// Delete all entries from the event
Iterator iter = toMap().keySet().iterator();
JsonNode baseNode = jsonNode;
while (iter.hasNext()) {
((ObjectNode) baseNode).remove((String)iter.next());
}
}

@Override
public String toJsonString() {
return jsonNode.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,23 @@ public void testDeleteKey(final String key) {
assertThat(result, is(nullValue()));
}

@Test
public void testClear() {
event.put("key1", UUID.randomUUID());
event.put("key2", UUID.randomUUID());
event.put("key3/key4", UUID.randomUUID());
event.clear();
UUID result = event.get("key1", UUID.class);
assertThat(result, is(nullValue()));
result = event.get("key2", UUID.class);
assertThat(result, is(nullValue()));
result = event.get("key3", UUID.class);
assertThat(result, is(nullValue()));
result = event.get("key3/key4", UUID.class);
assertThat(result, is(nullValue()));
assertThat(event.toMap().size(), equalTo(0));
}

@ParameterizedTest
@ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"})
public void testDelete_withNonexistentKey(final String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,36 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

@DataPrepperPlugin(name = "select_entries", pluginType = Processor.class, pluginConfigurationType = SelectEntriesProcessorConfig.class)
public class SelectEntriesProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private final List<String> entries;
private final List<String> keysToInclude;
private final String selectWhen;

private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public SelectEntriesProcessor(final PluginMetrics pluginMetrics, final SelectEntriesProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.entries = Arrays.asList(config.getIncludeKeys());
this.selectWhen = config.getSelectWhen();
if (selectWhen != null
&& !expressionEvaluator.isValidExpressionStatement(selectWhen)) {
throw new InvalidPluginConfigurationException(
String.format("select_when value of %s is not a valid expression statement. " +
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.", selectWhen));
}
this.keysToInclude = config.getIncludeKeys();
this.expressionEvaluator = expressionEvaluator;
}

Expand All @@ -49,18 +53,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
// To handle nested case, just get the values and store
// in a temporary map.
Map<String, Object> outMap = new HashMap<>();
for (String entry: entries) {
Object val = recordEvent.get(entry, Object.class);
if (val != null) {
outMap.put(entry, val);
for (String keyToInclude: keysToInclude) {
Object value = recordEvent.get(keyToInclude, Object.class);
if (value != null) {
outMap.put(keyToInclude, value);
}
}
// Delete all entries from the event
Set keysToDelete = recordEvent.toMap().keySet();
Iterator iter = keysToDelete.iterator();
while (iter.hasNext()) {
recordEvent.delete((String)iter.next());
}
recordEvent.clear();

// add back only the keys selected
for (Map.Entry<String, Object> entry: outMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.util.List;

public class SelectEntriesProcessorConfig {
@NotEmpty
@NotNull
@JsonProperty("include_keys")
private String[] includeKeys;
private List<String> includeKeys;

@JsonProperty("select_when")
private String selectWhen;

public String[] getIncludeKeys() {
public List<String> getIncludeKeys() {
return includeKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -38,8 +40,8 @@ public class SelectEntriesProcessorTests {
private ExpressionEvaluator expressionEvaluator;

@Test
public void testSelectEntriesProcessorTest() {
when(mockConfig.getIncludeKeys()).thenReturn(new String[] { "key1", "key2"});
public void testSelectEntriesProcessor() {
when(mockConfig.getIncludeKeys()).thenReturn(List.of("key1", "key2"));
when(mockConfig.getSelectWhen()).thenReturn(null);
final SelectEntriesProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -58,8 +60,8 @@ public void testSelectEntriesProcessorTest() {
}

@Test
public void testWithKeyDneSelectEntriesProcessorTest() {
when(mockConfig.getIncludeKeys()).thenReturn(new String[] { "key1", "key2"});
public void testWithKeyDneSelectEntriesProcessor() {
when(mockConfig.getIncludeKeys()).thenReturn(List.of("key1", "key2"));
when(mockConfig.getSelectWhen()).thenReturn(null);
final SelectEntriesProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -72,9 +74,19 @@ public void testWithKeyDneSelectEntriesProcessorTest() {
}

@Test
public void testSelectEntriesProcessorWithConditionTest() {
when(mockConfig.getIncludeKeys()).thenReturn(new String[] { "key1", "key2"});
final String selectWhen = UUID.randomUUID().toString();
public void testSelectEntriesProcessorWithInvalidCondition() {
final String selectWhen = "/message == \""+UUID.randomUUID().toString()+"\"";
when(expressionEvaluator.isValidExpressionStatement(selectWhen)).thenReturn(false);
when(mockConfig.getSelectWhen()).thenReturn(selectWhen);
assertThrows(InvalidPluginConfigurationException.class, () -> createObjectUnderTest());
final Record<Event> record = getEvent("thisisamessage");
}

@Test
public void testSelectEntriesProcessorWithCondition() {
when(mockConfig.getIncludeKeys()).thenReturn(List.of("key1", "key2"));
final String selectWhen = "/message == \""+UUID.randomUUID().toString()+"\"";
when(expressionEvaluator.isValidExpressionStatement(selectWhen)).thenReturn(true);
when(mockConfig.getSelectWhen()).thenReturn(selectWhen);
final SelectEntriesProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -94,8 +106,8 @@ public void testSelectEntriesProcessorWithConditionTest() {
}

@Test
public void testNestedSelectEntriesProcessorTest() {
when(mockConfig.getIncludeKeys()).thenReturn(new String[] { "nested/key1", "nested/nested2/key2"});
public void testNestedSelectEntriesProcessor() {
when(mockConfig.getIncludeKeys()).thenReturn(List.of("nested/key1", "nested/nested2/key2"));
when(mockConfig.getSelectWhen()).thenReturn(null);
final String value1 = UUID.randomUUID().toString();
final String value2 = UUID.randomUUID().toString();
Expand Down

0 comments on commit 37cb242

Please sign in to comment.