Skip to content

Commit

Permalink
Introducing delete input configuration option for some parsers
Browse files Browse the repository at this point in the history
  • Loading branch information
san81 committed Jul 1, 2024
1 parent af6bce4 commit 8218740
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Ev
private final String parseWhen;
private final List<String> tagsOnFailure;
private final boolean overwriteIfDestinationExists;
private final boolean deleteSourceRequested;

private final ExpressionEvaluator expressionEvaluator;

Expand All @@ -50,6 +51,7 @@ protected AbstractParseProcessor(PluginMetrics pluginMetrics,
parseWhen = commonParseConfig.getParseWhen();
tagsOnFailure = commonParseConfig.getTagsOnFailure();
overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists();
deleteSourceRequested = commonParseConfig.isDeleteSourceRequested();
this.expressionEvaluator = expressionEvaluator;
}

Expand Down Expand Up @@ -93,6 +95,10 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
} else if (overwriteIfDestinationExists || !event.containsKey(destination)) {
event.put(destination, parsedValue);
}

if(deleteSourceRequested) {
event.delete(this.source);
}
} catch (Exception e) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,12 @@ public interface CommonParseConfig {
* Defaults to true.
*/
boolean getOverwriteIfDestinationExists();

/**
* An optional setting used to request dropping the original raw message after successfully parsing the input event.
* Defaults to false.
*/
default boolean isDeleteSourceRequested() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class ParseJsonProcessorConfig implements CommonParseConfig {
@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

@JsonProperty
private boolean deleteSource = false;

@Override
public String getSource() {
return source;
Expand Down Expand Up @@ -63,11 +66,16 @@ public boolean getOverwriteIfDestinationExists() {
return overwriteIfDestinationExists;
}

@Override
public boolean isDeleteSourceRequested() {
return deleteSource;
}

@AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)")
boolean isValidDestination() {
if (Objects.isNull(destination)) return true;

final String trimmedDestination = destination.trim();
return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/"));
return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ void test_when_using_ion_features_then_processorParsesCorrectly() {
final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(true));
assertThat(parsedEvent.get("bareKey", Integer.class), equalTo(1));
assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL"));
assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z"));
assertThat(parsedEvent.get("attribute", Double.class), equalTo(100.0));
}

@Test
void test_when_deleteSourceFlagEnabled() {
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);
parseJsonProcessor = new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator);

final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false));
assertThat(parsedEvent.get("bareKey", Integer.class), equalTo(1));
assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL"));
assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value
assertThat(objectUnderTest.getPointer(), equalTo(null));
assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null));
assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true));
assertThat(objectUnderTest.isDeleteSourceRequested(), equalTo(false));
}

@Nested
Expand Down Expand Up @@ -57,6 +58,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse(
setField(ParseJsonProcessorConfig.class, config, "tagsOnFailure", tagsList);

assertThat(config.getTagsOnFailure(), equalTo(tagsList));

setField(ParseJsonProcessorConfig.class, config, "deleteSource", true);
assertThat(config.isDeleteSourceRequested(), equalTo(true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ void test_when_nestedJSONArray_then_parsedIntoArrayAndIndicesAccessible() {
assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0)));
}

@Test
void test_when_deleteSourceFlagEnabled() {
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);
parseJsonProcessor = new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator);

final String key = "key";
final ArrayList<String> value = new ArrayList<>(List.of("Element0","Element1","Element2"));
final String jsonArray = "{\"key\":[\"Element0\",\"Element1\",\"Element2\"]}";
final Event parsedEvent = createAndParseMessageEvent(jsonArray);

assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false));
assertThat(parsedEvent.get(key, ArrayList.class), equalTo(value));
final String pointerToFirstElement = key + "/0";
assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0)));
}

@Test
void test_when_nestedJSONArrayOfJSON_then_parsedIntoArrayAndIndicesAccessible() {
parseJsonProcessor = createObjectUnderTest();
Expand Down Expand Up @@ -373,23 +389,21 @@ private String constructDeeplyNestedJsonPointer(final int numberOfLayers) {

/**
* Naive serialization that converts every = to : and wraps every word with double quotes (no error handling or input validation).
* @param messageMap
* @return
* @param messageMap source key value map
* @return serialized string representation of the map
*/
private String convertMapToJSONString(final Map<String, Object> messageMap) {
final String replaceEquals = messageMap.toString().replace("=",":");
final String addQuotes = replaceEquals.replaceAll("(\\w+)", "\"$1\""); // wrap every word in quotes
return addQuotes;
return replaceEquals.replaceAll("(\\w+)", "\"$1\"");
}

/**
* Creates a Map that maps a single key to a value nested numberOfLayers layers deep.
* @param numberOfLayers
* @return
* @param numberOfLayers indicates the depth of layers count
* @return a Map representing the nested structure
*/
private Map<String, Object> constructArbitrarilyDeepJsonMap(final int numberOfLayers) {
final Map<String, Object> result = Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers));
return result;
return Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers));
}

private Object deepJsonMapHelper(final int currentLayer, final int numberOfLayers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ void test_when_using_xml_features_then_processorParsesCorrectly() {
assertThat(parsedEvent.get("age", String.class), equalTo("30"));
}

@Test
void test_when_deleteSourceFlagEnabled() {

final String tagOnFailure = UUID.randomUUID().toString();
when(processorConfig.getTagsOnFailure()).thenReturn(List.of(tagOnFailure));
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);

parseXmlProcessor = createObjectUnderTest();

final String serializedMessage = "<Person><name>John Doe</name><age>30</age></Person>";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);
assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false));
assertThat(parsedEvent.get("name", String.class), equalTo("John Doe"));
assertThat(parsedEvent.get("age", String.class), equalTo("30"));
}

@Test
void test_when_using_invalid_xml_tags_correctly() {

Expand Down

0 comments on commit 8218740

Please sign in to comment.