Skip to content

Commit

Permalink
Add support for multiple entries of 'with_keys' with 'delete_when' co…
Browse files Browse the repository at this point in the history
…ndition in delete_entries processor

Signed-off-by: Niketan Chandarana <[email protected]>
  • Loading branch information
niketan16 committed Jan 24, 2025
1 parent 19d9dae commit bb65af0
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 20 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/mutate-event-processors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test-event')
testImplementation testLibs.slf4j.simple
testImplementation testLibs.spring.test
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,42 @@
public class DeleteEntryProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(DeleteEntryProcessor.class);
private final List<EventKey> entries;
private final List<EventKey> withKeys;
private final String deleteWhen;
private final List<DeleteEntryProcessorConfig.Entry> entries;

private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntryProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.entries = config.getWithKeys();
this.withKeys = config.getWithKeys();
this.deleteWhen = config.getDeleteWhen();
this.expressionEvaluator = expressionEvaluator;

if (deleteWhen != null
&& !expressionEvaluator.isValidExpressionStatement(deleteWhen)) {
throw new InvalidPluginConfigurationException(
String.format("delete_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", deleteWhen));
&& !expressionEvaluator.isValidExpressionStatement(deleteWhen)) {
throw new InvalidPluginConfigurationException(
String.format("delete_when %s is not a valid expression statement. See https://opensearch" +
".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", deleteWhen));
}

if (this.withKeys != null && !this.withKeys.isEmpty()) {
DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(this.withKeys, this.deleteWhen);
this.entries = List.of(entry);
} else {
this.entries = config.getEntries();
}

this.entries.forEach(entry -> {
if (entry.getDeleteWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(entry.getDeleteWhen())) {
throw new InvalidPluginConfigurationException(
String.format("delete_when %s is not a valid expression statement. See https://opensearch" +
".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
entry.getDeleteWhen()));
}
});
}

@Override
Expand All @@ -53,13 +72,15 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
final Event recordEvent = record.getData();

try {
if (Objects.nonNull(deleteWhen) && !expressionEvaluator.evaluateConditional(deleteWhen, recordEvent)) {
continue;
}


for (final EventKey entry : entries) {
recordEvent.delete(entry);
for (final DeleteEntryProcessorConfig.Entry entry : entries) {
if (Objects.nonNull(entry.getDeleteWhen()) && !expressionEvaluator.evaluateConditional(entry.getDeleteWhen(),
recordEvent)) {
continue;
}

for (final EventKey key : entry.getWithKeys()) {
recordEvent.delete(key);
}
}
} catch (final Exception e) {
LOG.atError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertFalse;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.IfThenElse;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.SchemaProperty;
import org.opensearch.dataprepper.model.annotations.ExampleValues;
import org.opensearch.dataprepper.model.annotations.ExampleValues.Example;
import org.opensearch.dataprepper.model.event.EventKey;
Expand All @@ -19,31 +25,90 @@

import java.util.List;

@ConditionalRequired(value = {
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "entries", value = "null")},
thenExpect = {@SchemaProperty(field = "with_keys")}
),
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "with_keys", value = "null")},
thenExpect = {@SchemaProperty(field = "entries")}
)
})
@JsonPropertyOrder
@JsonClassDescription("The <code>delete_entries</code> processor deletes fields from events. " +
"You can define the keys you want to delete in the <code>with_keys</code> configuration. " +
"Those keys and their values are deleted from events.")
public class DeleteEntryProcessorConfig {
@NotEmpty
@NotNull

@JsonPropertyOrder
public static class Entry {
@NotEmpty
@NotNull
@JsonProperty("with_keys")
@EventKeyConfiguration(EventKeyFactory.EventAction.DELETE)
@JsonPropertyDescription("A list of keys to be deleted.")
private List<@NotNull @NotEmpty EventKey> withKeys;

@JsonProperty("delete_when")
@JsonPropertyDescription("Specifies under what condition the deletion should be performed. " +
"By default, keys are always deleted. Example: <code>/mykey == \"---\"</code>")
@ExampleValues({
@Example(value = "/some_key == null", description = "Only runs the deletion if the key some_key is null or does not exist.")
})
private String deleteWhen;

public List<EventKey> getWithKeys() {
return withKeys;
}

public String getDeleteWhen() {
return deleteWhen;
}

public Entry(final List<EventKey> withKeys, final String deleteWhen) {
this.withKeys = withKeys;
this.deleteWhen = deleteWhen;
}

public Entry() {

}
}

@JsonProperty("with_keys")
@EventKeyConfiguration(EventKeyFactory.EventAction.DELETE)
@JsonPropertyDescription("A list of keys to be deleted.")
@JsonPropertyDescription("A list of keys to be deleted (legacy format).")
private List<@NotNull @NotEmpty EventKey> withKeys;

@JsonProperty("delete_when")
@JsonPropertyDescription("Specifies under what condition the <code>delete_entries</code> processor should perform deletion. " +
"By default, keys are always deleted. Example: <code>/mykey == \"---\"</code>")
@ExampleValues({
@Example(value = "/some_key == null", description = "Only runs the delete_entries processor on the Event if the key some_key is null or does not exist.")
})
@JsonPropertyDescription("Specifies under what condition the deletion should be performed (legacy format).")
private String deleteWhen;

@Valid
@JsonProperty("entries")
@JsonPropertyDescription("A list of entries to delete from the event.")
private List<Entry> entries;

@AssertTrue(message = "Either 'entries' or 'with_keys' must be specified, but neither was found")
boolean isConfigurationPresent() {
return entries != null || withKeys != null;
}

@AssertFalse(message = "Either use 'entries' OR 'with_keys' with 'delete_when' configuration, but not both")
boolean hasBothConfigurations() {
return entries != null && withKeys != null;
}

public List<EventKey> getWithKeys() {
return withKeys;
}

public String getDeleteWhen() {
return deleteWhen;
}

public List<Entry> getEntries() {
return entries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.record.Record;
import org.springframework.test.util.ReflectionTestUtils;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -134,6 +135,81 @@ public void testNestedDeleteProcessorTest() {
assertThat(editedRecords.get(0).getData().containsKey("message"), is(true));
}

@Test
public void test_multiple_entries_with_different_delete_when_conditions() {
final DeleteEntryProcessorConfig.Entry entry1 = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key1"
, EventKeyFactory.EventAction.DELETE)), "condition1");
final DeleteEntryProcessorConfig.Entry entry2 = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key2"
, EventKeyFactory.EventAction.DELETE)), "condition2");

when(mockConfig.getEntries()).thenReturn(List.of(entry1, entry2));
when(expressionEvaluator.isValidExpressionStatement("condition1")).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("condition2")).thenReturn(true);

final DeleteEntryProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("test");
record.getData().put("key1", "value1");
record.getData().put("key2", "value2");

when(expressionEvaluator.evaluateConditional("condition1", record.getData())).thenReturn(true);
when(expressionEvaluator.evaluateConditional("condition2", record.getData())).thenReturn(false);

final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("key1"), is(false));
assertThat(editedRecords.get(0).getData().containsKey("key2"), is(true));
}

@Test
public void test_legacy_format_conversion_to_entries_format() {
when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.DELETE)));
when(mockConfig.getDeleteWhen()).thenReturn("condition");
when(expressionEvaluator.isValidExpressionStatement("condition")).thenReturn(true);

final DeleteEntryProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("test");

when(expressionEvaluator.evaluateConditional("condition", record.getData())).thenReturn(true);

final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message"), is(false));
}

@Test
public void invalid_delete_when_with_entries_format_throws_InvalidPluginConfigurationException() {
DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key1",
EventKeyFactory.EventAction.DELETE)), "invalid_condition");

when(mockConfig.getEntries()).thenReturn(List.of(entry));
when(expressionEvaluator.isValidExpressionStatement("invalid_condition")).thenReturn(false);

assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
public void test_both_configurations_used_together() {
final DeleteEntryProcessorConfig configObjectUnderTest = new DeleteEntryProcessorConfig();
final DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key1"
, EventKeyFactory.EventAction.DELETE)), "condition");

ReflectionTestUtils.setField(configObjectUnderTest, "withKeys", List.of(eventKeyFactory.createEventKey("message",
EventKeyFactory.EventAction.DELETE)));
ReflectionTestUtils.setField(configObjectUnderTest, "entries", List.of(entry));

assertThat(configObjectUnderTest.hasBothConfigurations(), is(true));
}

@Test
public void test_no_configuration_used() {
final DeleteEntryProcessorConfig configObjectUnderTest = new DeleteEntryProcessorConfig();

ReflectionTestUtils.setField(configObjectUnderTest, "withKeys", null);
ReflectionTestUtils.setField(configObjectUnderTest, "entries", null);

assertThat(configObjectUnderTest.isConfigurationPresent(), is(false));
}

private DeleteEntryProcessor createObjectUnderTest() {
return new DeleteEntryProcessor(pluginMetrics, mockConfig, expressionEvaluator);
}
Expand Down

0 comments on commit bb65af0

Please sign in to comment.