Skip to content

Commit

Permalink
Add flatten processor (opensearch-project#4138)
Browse files Browse the repository at this point in the history
* Initial commit

Signed-off-by: Hai Yan <[email protected]>

* Add complete functionality and tests

Signed-off-by: Hai Yan <[email protected]>

* Add test cases with dots in keys

Signed-off-by: Hai Yan <[email protected]>

* Update JacksonEvent tests

Signed-off-by: Hai Yan <[email protected]>

* Rename project to flatten-processor

Signed-off-by: Hai Yan <[email protected]>

* Rename flattenjson to flatten

Signed-off-by: Hai Yan <[email protected]>

* Address review comments

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Feb 28, 2024
1 parent a0b6f25 commit 75d7848
Show file tree
Hide file tree
Showing 8 changed files with 541 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,9 @@ private static boolean isValidKey(final String key) {
|| c == '-'
|| c == '_'
|| c == '@'
|| c == '/')) {
|| c == '/'
|| c == '['
|| c == ']')) {

return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testPutAndGet_withRandomString() {
}

@ParameterizedTest
@ValueSource(strings = {"/", "foo", "foo-bar", "foo_bar", "foo.bar", "/foo", "/foo/", "a1K.k3-01_02"})
@ValueSource(strings = {"/", "foo", "foo-bar", "foo_bar", "foo.bar", "/foo", "/foo/", "a1K.k3-01_02", "keyWithBrackets[]"})
void testPutAndGet_withStrings(final String key) {
final UUID value = UUID.randomUUID();

Expand Down Expand Up @@ -280,7 +280,7 @@ public void testOverwritingExistingKey() {
}

@ParameterizedTest
@ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"})
@ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key", "keyWithBrackets[]"})
public void testDeleteKey(final String key) {
event.put(key, UUID.randomUUID());
event.delete(key);
Expand Down Expand Up @@ -367,7 +367,7 @@ public void testIsValueAList_withNull() {

@ParameterizedTest
@ValueSource(strings = {"withSpecialChars*$%", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars",
"with,Comma", "with:Colon", "with[Bracket", "with|Brace"})
"with,Comma", "with:Colon", "with|Brace"})
void testKey_withInvalidKey_throwsIllegalArgumentException(final String invalidKey) {
assertThrowsForKeyCheck(IllegalArgumentException.class, invalidKey);
}
Expand Down
22 changes: 22 additions & 0 deletions data-prepper-plugins/flatten-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule {
limit {
minimum = 0.9
}
}
}
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.github.wnameless.json:json-flattener:0.16.6'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.flatten;

import com.github.wnameless.json.flattener.JsonFlattener;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@DataPrepperPlugin(name = "flatten", pluginType = Processor.class, pluginConfigurationType = FlattenProcessorConfig.class)
public class FlattenProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(FlattenProcessor.class);

private static final String SEPARATOR = "/";
private final FlattenProcessorConfig config;
private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public FlattenProcessor(final PluginMetrics pluginMetrics, final FlattenProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.config = config;
this.expressionEvaluator = expressionEvaluator;
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
final Event recordEvent = record.getData();

try {
if (config.getFlattenWhen() != null && !expressionEvaluator.evaluateConditional(config.getFlattenWhen(), recordEvent)) {
continue;
}

final String sourceJson = recordEvent.getAsJsonString(config.getSource());

// adds ignoreReservedCharacters() so that dots in keys are ignored during flattening
// e.g., {"a.b": {"c": 1}} will be flattened as expected: {"a.b.c": 1}; otherwise, flattened result will be {"[\"a.b\"]c": 1}
Map<String, Object> flattenedJson = new JsonFlattener(sourceJson).ignoreReservedCharacters().flattenAsMap();

if (config.isRemoveProcessedFields()) {
final Map<String, Object> sourceMap = recordEvent.get(config.getSource(), Map.class);
for (final String keyInSource : sourceMap.keySet()) {
recordEvent.delete(getJsonPointer(config.getSource(), keyInSource));
}
}

if (config.isRemoveListIndices()) {
flattenedJson = removeListIndicesInKeys(flattenedJson);
}

updateEvent(recordEvent, flattenedJson);
} catch (Exception e) {
LOG.error("Fail to perform flatten operation", e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
}
}
return records;
}

@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
}

private String getJsonPointer(final String outerKey, final String innerKey) {
if (outerKey.isEmpty()) {
return SEPARATOR + innerKey;
} else {
return SEPARATOR + outerKey + SEPARATOR + innerKey;
}
}

private Map<String, Object> removeListIndicesInKeys(final Map<String, Object> inputMap) {
final Map<String, Object> resultMap = new HashMap<>();

for (final Map.Entry<String, Object> entry : inputMap.entrySet()) {
final String keyWithoutIndices = removeListIndices(entry.getKey());
addFieldsToMapWithMerge(keyWithoutIndices, entry.getValue(), resultMap);
}
return resultMap;
}

private String removeListIndices(final String key) {
return key.replaceAll("\\[\\d+\\]", "[]");
}

private void addFieldsToMapWithMerge(String key, Object value, Map<String, Object> map) {
if (!map.containsKey(key)) {
map.put(key, value);
} else {
final Object currentValue = map.get(key);
if (currentValue instanceof List) {
((List<Object>)currentValue).add(value);
} else {
List<Object> newValue = new ArrayList<>();
newValue.add(currentValue);
newValue.add(value);
map.put(key, newValue);
}
}
}

private void updateEvent(Event recordEvent, Map<String, Object> flattenedJson) {
if (config.getTarget().isEmpty()) {
// Target is root
for (final Map.Entry<String, Object> entry : flattenedJson.entrySet()) {
recordEvent.put(entry.getKey(), entry.getValue());
}
} else {
recordEvent.put(config.getTarget(), flattenedJson);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.flatten;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;

import java.util.List;

public class FlattenProcessorConfig {

@NotNull
@JsonProperty("source")
private String source;

@NotNull
@JsonProperty("target")
private String target;

@JsonProperty("remove_processed_fields")
private boolean removeProcessedFields = false;

@JsonProperty("remove_list_indices")
private boolean removeListIndices = false;

@JsonProperty("flatten_when")
private String flattenWhen;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

public String getSource() {
return source;
}

public String getTarget() {
return target;
}

public boolean isRemoveProcessedFields() {
return removeProcessedFields;
}

public boolean isRemoveListIndices() {
return removeListIndices;
}

public String getFlattenWhen() {
return flattenWhen;
}

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.flatten;

import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

class FlattenProcessorConfigTest {
@Test
void testDefaultConfig() {
final FlattenProcessorConfig FlattenProcessorConfig = new FlattenProcessorConfig();

assertThat(FlattenProcessorConfig.getSource(), equalTo(null));
assertThat(FlattenProcessorConfig.getTarget(), equalTo(null));
assertThat(FlattenProcessorConfig.isRemoveListIndices(), equalTo(false));
assertThat(FlattenProcessorConfig.isRemoveListIndices(), equalTo(false));
assertThat(FlattenProcessorConfig.getFlattenWhen(), equalTo(null));
assertThat(FlattenProcessorConfig.getTagsOnFailure(), equalTo(null));
}
}
Loading

0 comments on commit 75d7848

Please sign in to comment.