Skip to content

Commit

Permalink
Add parse_ion processor (#3803)
Browse files Browse the repository at this point in the history
Add parse_ion processor

Signed-off-by: Emma Becar <[email protected]>
Co-authored-by: Emma Becar <[email protected]>
  • Loading branch information
emmachase and Emma Becar authored Dec 6, 2023
1 parent d501af6 commit 0af8d4c
Show file tree
Hide file tree
Showing 14 changed files with 551 additions and 89 deletions.
65 changes: 65 additions & 0 deletions data-prepper-plugins/parse-json-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,71 @@ The processor will parse the message into the following:

* `tags_on_failure` (Optional): A `List` of `String`s that specifies the tags to be set in the event the processor fails to parse or an unknown exception occurs while parsing. This tag may be used in conditional expressions in other parts of the configuration

# Parse Ion Processor
This is a processor that takes in an Event and parses its Ion data, including any nested fields.

> Note: Ion Timestamps are parsed into ISO-8601 Z strings, and Ion Blobs are parsed into base64 encoded strings.
## Basic Usage
To get started, create the following `pipelines.yaml`.
```yaml
parse-ion-pipeline:
source:
stdin:
processor:
- parse_ion:
sink:
- stdout:
```
#### Basic Example:
If you wish to test the Ion Processor with the above config then you may find the following example useful.
Run the pipeline and paste the following line into your console, and then enter `exit` on a new line.
```
{"outer_key": {"inner_key": "inner_value"}}
```
The processor will parse the message into the following:
```
{"message": {"outer_key": {"inner_key": "inner_value"}}", "outer_key":{"inner_key":"inner_value"}}}
```
#### Example with JSON Pointer:
If you wish to parse a selection of the Ion data, you can specify a JSON Pointer using the `pointer` option in the configuration.
The following configuration file and example demonstrates a basic pointer use case.
```yaml
parse-json-pipeline:
source:
stdin:
processor:
- parse_json:
pointer: "outer_key/inner_key"
sink:
- stdout:
```
Run the pipeline and paste the following line into your console, and then enter `exit` on a new line.
```
{"outer_key": {"inner_key": "inner_value"}}
```

The processor will parse the message into the following:
```
{"message": {"outer_key": {"inner_key": "inner_value"}}", "inner_key": "inner_value"}
```
## Configuration
* `source` (Optional) — The field in the `Event` that will be parsed.
* Default: `message`

* `destination` (Optional) — The destination field of the parsed Ion. Defaults to the root of the `Event`.
* Defaults to writing to the root of the `Event` (The processor will write to root when `destination` is `null`).
* Cannot be `""`, `/`, or any whitespace-only `String` because these are not valid `Event` fields.

* `pointer` (Optional) — A JSON Pointer to the field to be parsed.
* There is no `pointer` by default, meaning the entire `source` is parsed.
* The `pointer` can access JSON Array indices as well.
* If the JSON Pointer is invalid then the entire `source` data is parsed into the outgoing `Event`.
* If the pointed-to key already exists in the `Event` and the `destination` is the root, then the entire path of the key will be used.

* `tags_on_failure` (Optional): A `List` of `String`s that specifies the tags to be set in the event the processor fails to parse or an unknown exception occurs while parsing. This tag may be used in conditional expressions in other parts of the configuration

# JSON Sink/Output Codec

This is an implementation of JSON Sink Codec that parses the Dataprepper Events into JSON Objects and writes them into the underlying OutputStream.
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/parse-json-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'org.apache.parquet:parquet-common:1.13.1'
testImplementation project(':data-prepper-test-common')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,15 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.parsejson;
package org.opensearch.dataprepper.plugins.processor.parse;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,12 +23,12 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

@DataPrepperPlugin(name = "parse_json", pluginType = Processor.class, pluginConfigurationType = ParseJsonProcessorConfig.class)
public class ParseJsonProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(ParseJsonProcessor.class);
public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractParseProcessor.class);

private final String source;
private final String destination;
Expand All @@ -44,66 +39,70 @@ public class ParseJsonProcessor extends AbstractProcessor<Record<Event>, Record<

private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public ParseJsonProcessor(final PluginMetrics pluginMetrics,
final ParseJsonProcessorConfig parseJsonProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
protected AbstractParseProcessor(PluginMetrics pluginMetrics,
CommonParseConfig commonParseConfig,
ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);

source = parseJsonProcessorConfig.getSource();
destination = parseJsonProcessorConfig.getDestination();
pointer = parseJsonProcessorConfig.getPointer();
parseWhen = parseJsonProcessorConfig.getParseWhen();
tagsOnFailure = parseJsonProcessorConfig.getTagsOnFailure();
overwriteIfDestinationExists = parseJsonProcessorConfig.getOverwriteIfDestinationExists();
source = commonParseConfig.getSource();
destination = commonParseConfig.getDestination();
pointer = commonParseConfig.getPointer();
parseWhen = commonParseConfig.getParseWhen();
tagsOnFailure = commonParseConfig.getTagsOnFailure();
overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists();
this.expressionEvaluator = expressionEvaluator;
}

/**
* @return Optional HashMap of the parsed value - empty if the message was invalid (be sure to log the error)
*/
protected abstract Optional<HashMap<String, Object>> readValue(String message, Event context);

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
final ObjectMapper objectMapper = new ObjectMapper();
final boolean doWriteToRoot = Objects.isNull(destination);
final boolean doUsePointer = Objects.nonNull(pointer);

for (final Record<Event> record : records) {

try {
final Event event = record.getData();
try {
if (Objects.nonNull(parseWhen) && !expressionEvaluator.evaluateConditional(parseWhen, event)) {
continue;
}

final String message = event.get(source, String.class);
if (Objects.isNull(message)) {
continue;
}
final TypeReference<HashMap<String, Object>> hashMapTypeReference = new TypeReference<HashMap<String, Object>>() {
};
Map<String, Object> parsedJson = objectMapper.readValue(message, hashMapTypeReference);

if (doUsePointer) {
parsedJson = parseUsingPointer(event, parsedJson, pointer, doWriteToRoot);
}

if (doWriteToRoot) {
writeToRoot(event, parsedJson);
} else if (overwriteIfDestinationExists || !event.containsKey(destination)) {
event.put(destination, parsedJson);
}
} catch (final JsonProcessingException jsonException) {
event.getMetadata().addTags(tagsOnFailure);
LOG.error(EVENT, "An exception occurred due to invalid JSON while reading event [{}]", event, jsonException);
} catch (final Exception e) {

if (Objects.nonNull(parseWhen) && !expressionEvaluator.evaluateConditional(parseWhen, event)) {
continue;
}

final String message = event.get(source, String.class);
if (Objects.isNull(message)) {
continue;
}

final Optional<HashMap<String, Object>> parsedValueOptional = readValue(message, event);
if (parsedValueOptional.isEmpty()) {
event.getMetadata().addTags(tagsOnFailure);
LOG.error(EVENT, "An exception occurred while using the parse_json processor on Event [{}]", event, e);
continue;
}

Map<String, Object> parsedValue = parsedValueOptional.get();

if (doUsePointer) {
parsedValue = parseUsingPointer(event, parsedValue, pointer, doWriteToRoot);
}

if (doWriteToRoot) {
writeToRoot(event, parsedValue);
} else if (overwriteIfDestinationExists || !event.containsKey(destination)) {
event.put(destination, parsedValue);
}
} catch (Exception e) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
}
}
return records;
}

@Override
public void prepareForShutdown() {

/* nothing to do */
}

@Override
Expand All @@ -113,7 +112,11 @@ public boolean isReadyForShutdown() {

@Override
public void shutdown() {
/* nothing to do */
}

private String getProcessorName() {
return this.getClass().getAnnotation(DataPrepperPlugin.class).name();
}

private Map<String, Object> parseUsingPointer(final Event event, final Map<String, Object> parsedJson, final String pointer,
Expand All @@ -126,7 +129,7 @@ private Map<String, Object> parseUsingPointer(final Event event, final Map<Strin

final boolean pointerIsValid = temporaryEvent.containsKey(actualPointer);
if (!pointerIsValid) {
LOG.error(EVENT, "Writing entire JSON because the pointer {} is invalid on Event {}", pointer, event);
LOG.error(EVENT, "Writing entire source because the pointer {} is invalid on Event [{}]", pointer, event);
return parsedJson;
}

Expand All @@ -148,17 +151,14 @@ private String getEndOfPointer(final String trimmedPointer) {
final boolean lastElementInPathIsAnArrayIndex = elements.get(elements.size()-1).matches("[0-9]+");

if (lastElementInPathIsAnArrayIndex) {
final String lastTwoElements = elements.get(elements.size() - 2) + "/" + elements.get(elements.size() - 1);
return lastTwoElements;
return elements.get(elements.size() - 2) + "/" + elements.get(elements.size() - 1); // return the last two elements
}

return elements.get(elements.size()-1);
}

/**
* Trim the pointer and change each front slash / to be a dot (.) to proccess
* @param pointer
* @return
* Trim the pointer and change each front slash / to be a dot (.) to process
*/
private String normalizePointerStructure(final String pointer) {
return pointer.replace('/','.');
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.parse;

import java.util.List;

public interface CommonParseConfig {
/**
* The field of the Event that contains the JSON data.
*
* @return The name of the source field.
*/
String getSource();

/**
* The destination that the parsed JSON is written to. Defaults to the root of the Event.
* If the destination field already exists, it will be overwritten.
*
* @return The name of the destination field.
*/
String getDestination();

/**
* An optional setting used to specify a JSON Pointer. Pointer points to the JSON key that will be parsed into the destination.
* There is no pointer by default, meaning that the entirety of source will be parsed. If the target key would overwrite an existing
* key in the Event then the absolute path of the target key will be placed into destination
*
* Note: (should this be configurable/what about double conflicts?)
* @return String representing JSON Pointer
*/
String getPointer();

/**
* A `List` of `String`s that specifies the tags to be set in the event the processor fails to parse or an unknown
* exception occurs while parsing. This tag may be used in conditional expressions in other parts of the configuration.
*
* @return List of tags to be set on failure
*/
List<String> getTagsOnFailure();

/**
* An optional setting used to specify a conditional expression.
* If the expression evaluates to true, the processor will parse the source field.
*
* @return String representing conditional expression
*/
String getParseWhen();

/**
* An optional setting used to specify whether the destination field should be overwritten if it already exists.
* Defaults to true.
*/
boolean getOverwriteIfDestinationExists();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.parse.ion;

import com.amazon.ion.Timestamp;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

import java.io.IOException;

public class IonTimestampConverterModule extends SimpleModule {
public IonTimestampConverterModule() {
addSerializer(Timestamp.class, new StdSerializer<>(Timestamp.class) {
@Override
public void serialize(Timestamp value, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeString(value.toZString());
}
});
}
}
Loading

0 comments on commit 0af8d4c

Please sign in to comment.