diff --git a/data-prepper-plugins/parse-json-processor/README.md b/data-prepper-plugins/parse-json-processor/README.md index f4c5e65f70..e2288decbc 100644 --- a/data-prepper-plugins/parse-json-processor/README.md +++ b/data-prepper-plugins/parse-json-processor/README.md @@ -60,6 +60,71 @@ The processor will parse the message into the following: * `tags_on_failure` (Optional): A `List` of `String`s that specifies the tags to be set in the event the processor fails to parse or an unknown exception occurs while parsing. This tag may be used in conditional expressions in other parts of the configuration +# Parse Ion Processor +This is a processor that takes in an Event and parses its Ion data, including any nested fields. + +> Note: Ion Timestamps are parsed into ISO-8601 Z strings, and Ion Blobs are parsed into base64 encoded strings. + +## Basic Usage +To get started, create the following `pipelines.yaml`. +```yaml +parse-ion-pipeline: + source: + stdin: + processor: + - parse_ion: + sink: + - stdout: +``` +#### Basic Example: +If you wish to test the Ion Processor with the above config then you may find the following example useful. +Run the pipeline and paste the following line into your console, and then enter `exit` on a new line. +``` +{"outer_key": {"inner_key": "inner_value"}} +``` + +The processor will parse the message into the following: +``` +{"message": {"outer_key": {"inner_key": "inner_value"}}", "outer_key":{"inner_key":"inner_value"}}} +``` +#### Example with JSON Pointer: +If you wish to parse a selection of the Ion data, you can specify a JSON Pointer using the `pointer` option in the configuration. +The following configuration file and example demonstrates a basic pointer use case. +```yaml +parse-json-pipeline: + source: + stdin: + processor: + - parse_json: + pointer: "outer_key/inner_key" + sink: + - stdout: +``` +Run the pipeline and paste the following line into your console, and then enter `exit` on a new line. +``` +{"outer_key": {"inner_key": "inner_value"}} +``` + +The processor will parse the message into the following: +``` +{"message": {"outer_key": {"inner_key": "inner_value"}}", "inner_key": "inner_value"} +``` +## Configuration +* `source` (Optional) — The field in the `Event` that will be parsed. + * Default: `message` + +* `destination` (Optional) — The destination field of the parsed Ion. Defaults to the root of the `Event`. + * Defaults to writing to the root of the `Event` (The processor will write to root when `destination` is `null`). + * Cannot be `""`, `/`, or any whitespace-only `String` because these are not valid `Event` fields. + +* `pointer` (Optional) — A JSON Pointer to the field to be parsed. + * There is no `pointer` by default, meaning the entire `source` is parsed. + * The `pointer` can access JSON Array indices as well. + * If the JSON Pointer is invalid then the entire `source` data is parsed into the outgoing `Event`. + * If the pointed-to key already exists in the `Event` and the `destination` is the root, then the entire path of the key will be used. + +* `tags_on_failure` (Optional): A `List` of `String`s that specifies the tags to be set in the event the processor fails to parse or an unknown exception occurs while parsing. This tag may be used in conditional expressions in other parts of the configuration + # JSON Sink/Output Codec This is an implementation of JSON Sink Codec that parses the Dataprepper Events into JSON Objects and writes them into the underlying OutputStream. diff --git a/data-prepper-plugins/parse-json-processor/build.gradle b/data-prepper-plugins/parse-json-processor/build.gradle index 84c3e83ec5..7d4842850a 100644 --- a/data-prepper-plugins/parse-json-processor/build.gradle +++ b/data-prepper-plugins/parse-json-processor/build.gradle @@ -15,6 +15,7 @@ dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:common') implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' implementation 'org.apache.parquet:parquet-common:1.13.1' testImplementation project(':data-prepper-test-common') } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java similarity index 57% rename from data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java rename to data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java index c770fac74d..b113534983 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java @@ -3,20 +3,15 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.processor.parsejson; +package org.opensearch.dataprepper.plugins.processor.parse; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.processor.AbstractProcessor; -import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,12 +23,12 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; -@DataPrepperPlugin(name = "parse_json", pluginType = Processor.class, pluginConfigurationType = ParseJsonProcessorConfig.class) -public class ParseJsonProcessor extends AbstractProcessor, Record> { - private static final Logger LOG = LoggerFactory.getLogger(ParseJsonProcessor.class); +public abstract class AbstractParseProcessor extends AbstractProcessor, Record> { + private static final Logger LOG = LoggerFactory.getLogger(AbstractParseProcessor.class); private final String source; private final String destination; @@ -44,66 +39,70 @@ public class ParseJsonProcessor extends AbstractProcessor, Record< private final ExpressionEvaluator expressionEvaluator; - @DataPrepperPluginConstructor - public ParseJsonProcessor(final PluginMetrics pluginMetrics, - final ParseJsonProcessorConfig parseJsonProcessorConfig, - final ExpressionEvaluator expressionEvaluator) { + protected AbstractParseProcessor(PluginMetrics pluginMetrics, + CommonParseConfig commonParseConfig, + ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); - source = parseJsonProcessorConfig.getSource(); - destination = parseJsonProcessorConfig.getDestination(); - pointer = parseJsonProcessorConfig.getPointer(); - parseWhen = parseJsonProcessorConfig.getParseWhen(); - tagsOnFailure = parseJsonProcessorConfig.getTagsOnFailure(); - overwriteIfDestinationExists = parseJsonProcessorConfig.getOverwriteIfDestinationExists(); + source = commonParseConfig.getSource(); + destination = commonParseConfig.getDestination(); + pointer = commonParseConfig.getPointer(); + parseWhen = commonParseConfig.getParseWhen(); + tagsOnFailure = commonParseConfig.getTagsOnFailure(); + overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists(); this.expressionEvaluator = expressionEvaluator; } + /** + * @return Optional HashMap of the parsed value - empty if the message was invalid (be sure to log the error) + */ + protected abstract Optional> readValue(String message, Event context); + @Override public Collection> doExecute(final Collection> records) { - final ObjectMapper objectMapper = new ObjectMapper(); final boolean doWriteToRoot = Objects.isNull(destination); final boolean doUsePointer = Objects.nonNull(pointer); for (final Record record : records) { - + try { final Event event = record.getData(); - try { - if (Objects.nonNull(parseWhen) && !expressionEvaluator.evaluateConditional(parseWhen, event)) { - continue; - } - - final String message = event.get(source, String.class); - if (Objects.isNull(message)) { - continue; - } - final TypeReference> hashMapTypeReference = new TypeReference>() { - }; - Map parsedJson = objectMapper.readValue(message, hashMapTypeReference); - - if (doUsePointer) { - parsedJson = parseUsingPointer(event, parsedJson, pointer, doWriteToRoot); - } - - if (doWriteToRoot) { - writeToRoot(event, parsedJson); - } else if (overwriteIfDestinationExists || !event.containsKey(destination)) { - event.put(destination, parsedJson); - } - } catch (final JsonProcessingException jsonException) { - event.getMetadata().addTags(tagsOnFailure); - LOG.error(EVENT, "An exception occurred due to invalid JSON while reading event [{}]", event, jsonException); - } catch (final Exception e) { + + if (Objects.nonNull(parseWhen) && !expressionEvaluator.evaluateConditional(parseWhen, event)) { + continue; + } + + final String message = event.get(source, String.class); + if (Objects.isNull(message)) { + continue; + } + + final Optional> parsedValueOptional = readValue(message, event); + if (parsedValueOptional.isEmpty()) { event.getMetadata().addTags(tagsOnFailure); - LOG.error(EVENT, "An exception occurred while using the parse_json processor on Event [{}]", event, e); + continue; } + + Map parsedValue = parsedValueOptional.get(); + + if (doUsePointer) { + parsedValue = parseUsingPointer(event, parsedValue, pointer, doWriteToRoot); + } + + if (doWriteToRoot) { + writeToRoot(event, parsedValue); + } else if (overwriteIfDestinationExists || !event.containsKey(destination)) { + event.put(destination, parsedValue); + } + } catch (Exception e) { + LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e); + } } return records; } @Override public void prepareForShutdown() { - + /* nothing to do */ } @Override @@ -113,7 +112,11 @@ public boolean isReadyForShutdown() { @Override public void shutdown() { + /* nothing to do */ + } + private String getProcessorName() { + return this.getClass().getAnnotation(DataPrepperPlugin.class).name(); } private Map parseUsingPointer(final Event event, final Map parsedJson, final String pointer, @@ -126,7 +129,7 @@ private Map parseUsingPointer(final Event event, final Map getTagsOnFailure(); + + /** + * An optional setting used to specify a conditional expression. + * If the expression evaluates to true, the processor will parse the source field. + * + * @return String representing conditional expression + */ + String getParseWhen(); + + /** + * An optional setting used to specify whether the destination field should be overwritten if it already exists. + * Defaults to true. + */ + boolean getOverwriteIfDestinationExists(); +} diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/IonTimestampConverterModule.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/IonTimestampConverterModule.java new file mode 100644 index 0000000000..3813e0e8d6 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/IonTimestampConverterModule.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.parse.ion; + +import com.amazon.ion.Timestamp; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; + +public class IonTimestampConverterModule extends SimpleModule { + public IonTimestampConverterModule() { + addSerializer(Timestamp.class, new StdSerializer<>(Timestamp.class) { + @Override + public void serialize(Timestamp value, JsonGenerator gen, SerializerProvider provider) throws IOException { + gen.writeString(value.toZString()); + } + }); + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java new file mode 100644 index 0000000000..39c06165d9 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.parse.ion; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.dataformat.ion.IonObjectMapper; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Optional; + +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + +@DataPrepperPlugin(name = "parse_ion", pluginType = Processor.class, pluginConfigurationType = ParseIonProcessorConfig.class) +public class ParseIonProcessor extends AbstractParseProcessor { + private static final Logger LOG = LoggerFactory.getLogger(ParseIonProcessor.class); + + private final IonObjectMapper objectMapper = new IonObjectMapper(); + + @DataPrepperPluginConstructor + public ParseIonProcessor(final PluginMetrics pluginMetrics, + final ParseIonProcessorConfig parseIonProcessorConfig, + final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics, parseIonProcessorConfig, expressionEvaluator); + + // Convert Timestamps to ISO-8601 Z strings + objectMapper.registerModule(new IonTimestampConverterModule()); + } + + @Override + protected Optional> readValue(String message, Event context) { + try { + // We need to do a two-step process here, read the value in, then convert away any Ion types like Timestamp + return Optional.of(objectMapper.convertValue(objectMapper.readValue(message, new TypeReference<>() {}), new TypeReference<>() {})); + } catch (JsonProcessingException e) { + LOG.error(EVENT, "An exception occurred due to invalid Ion while reading event [{}]", context, e); + return Optional.empty(); + } catch (Exception e) { + LOG.error(EVENT, "An exception occurred while using the parse_ion processor on Event [{}]", context, e); + return Optional.empty(); + } + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java new file mode 100644 index 0000000000..67a2f464ad --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.parse.ion; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.NotBlank; +import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; + +import java.util.List; +import java.util.Objects; + +public class ParseIonProcessorConfig implements CommonParseConfig { + static final String DEFAULT_SOURCE = "message"; + + @NotBlank + @JsonProperty("source") + private String source = DEFAULT_SOURCE; + + @JsonProperty("destination") + private String destination; + + @JsonProperty("pointer") + private String pointer; + + @JsonProperty("parse_when") + private String parseWhen; + + @JsonProperty("tags_on_failure") + private List tagsOnFailure; + + @JsonProperty("overwrite_if_destination_exists") + private boolean overwriteIfDestinationExists = true; + + @Override + public String getSource() { + return source; + } + + @Override + public String getDestination() { + return destination; + } + + @Override + public String getPointer() { + return pointer; + } + + @Override + public List getTagsOnFailure() { + return tagsOnFailure; + } + + @Override + public String getParseWhen() { return parseWhen; } + + @Override + public boolean getOverwriteIfDestinationExists() { + return overwriteIfDestinationExists; + } + + @AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)") + boolean isValidDestination() { + if (Objects.isNull(destination)) return true; + + final String trimmedDestination = destination.trim(); + return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/")); + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java new file mode 100644 index 0000000000..c62d2af9c2 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.parse.json; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.processor.Processor; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Optional; + +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + +@DataPrepperPlugin(name = "parse_json", pluginType = Processor.class, pluginConfigurationType = ParseJsonProcessorConfig.class) +public class ParseJsonProcessor extends AbstractParseProcessor { + private static final Logger LOG = LoggerFactory.getLogger(ParseJsonProcessor.class); + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @DataPrepperPluginConstructor + public ParseJsonProcessor(final PluginMetrics pluginMetrics, + final ParseJsonProcessorConfig parseJsonProcessorConfig, + final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics, parseJsonProcessorConfig, expressionEvaluator); + } + + @Override + protected Optional> readValue(String message, Event context) { + try { + return Optional.of(objectMapper.readValue(message, new TypeReference<>() {})); + } catch (JsonProcessingException e) { + LOG.error(EVENT, "An exception occurred due to invalid JSON while reading event [{}]", context, e); + return Optional.empty(); + } catch (Exception e) { + LOG.error(EVENT, "An exception occurred while using the parse_json processor on Event [{}]", context, e); + return Optional.empty(); + } + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java similarity index 61% rename from data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java rename to data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java index 6295f12fba..e0a2e91c1d 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java @@ -3,16 +3,17 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.processor.parsejson; +package org.opensearch.dataprepper.plugins.processor.parse.json; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotBlank; +import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; import java.util.Objects; import java.util.List; -public class ParseJsonProcessorConfig { +public class ParseJsonProcessorConfig implements CommonParseConfig { static final String DEFAULT_SOURCE = "message"; @NotBlank @@ -34,43 +35,30 @@ public class ParseJsonProcessorConfig { @JsonProperty("overwrite_if_destination_exists") private boolean overwriteIfDestinationExists = true; - /** - * The field of the Event that contains the JSON data. - * - * @return The name of the source field. - */ + @Override public String getSource() { return source; } - /** - * The destination that the parsed JSON is written to. Defaults to the root of the Event. - * If the destination field already exists, it will be overwritten. - * - * @return The name of the destination field. - */ + @Override public String getDestination() { return destination; } - /** - * An optional setting used to specify a JSON Pointer. Pointer points to the JSON key that will be parsed into the destination. - * There is no pointer by default, meaning that the entirety of source will be parsed. If the target key would overwrite an existing - * key in the Event then the absolute path of the target key will be placed into destination - * - * Note: (should this be configurable/what about double conflicts?) - * @return String representing JSON Pointer - */ + @Override public String getPointer() { return pointer; } + @Override public List getTagsOnFailure() { return tagsOnFailure; } + @Override public String getParseWhen() { return parseWhen; } + @Override public boolean getOverwriteIfDestinationExists() { return overwriteIfDestinationExists; } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/IonTimestampConverterModuleTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/IonTimestampConverterModuleTest.java new file mode 100644 index 0000000000..12fdbe88e6 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/IonTimestampConverterModuleTest.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.parse.ion; + +import com.amazon.ion.Timestamp; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.dataformat.ion.IonObjectMapper; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class IonTimestampConverterModuleTest { + @Test void test_when_module_is_installed_then_returns_string_representation_of_timestamp() throws JsonProcessingException { + final IonObjectMapper objectMapper = new IonObjectMapper(); + objectMapper.registerModule(new IonTimestampConverterModule()); + + final String timestamp = "2023-11-30T21:05:23.383Z"; + final String expectedValue = "\"" + timestamp + "\""; + final String actualValue = objectMapper.writeValueAsString(Timestamp.valueOf(timestamp)); + + assertEquals(expectedValue, actualValue); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java new file mode 100644 index 0000000000..0fb274ba13 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.parse.ion; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; + +public class ParseIonProcessorConfigTest { + + private ParseIonProcessorConfig createObjectUnderTest() { + return new ParseIonProcessorConfig(); + } + + @Test + public void test_when_defaultParseIonProcessorConfig_then_returns_default_values() { + final ParseIonProcessorConfig objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.getSource(), equalTo(ParseIonProcessorConfig.DEFAULT_SOURCE)); + assertThat(objectUnderTest.getDestination(), equalTo(null)); + assertThat(objectUnderTest.getPointer(), equalTo(null)); + assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); + assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true)); + } + + @Nested + class Validation { + final ParseIonProcessorConfig config = createObjectUnderTest(); + + @Test + void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse() + throws NoSuchFieldException, IllegalAccessException { + setField(ParseIonProcessorConfig.class, config, "destination", "good destination"); + + assertThat(config.isValidDestination(), equalTo(true)); + + setField(ParseIonProcessorConfig.class, config, "destination", ""); + + assertThat(config.isValidDestination(), equalTo(false)); + + setField(ParseIonProcessorConfig.class, config, "destination", " "); + + assertThat(config.isValidDestination(), equalTo(false)); + + setField(ParseIonProcessorConfig.class, config, "destination", " / "); + + assertThat(config.isValidDestination(), equalTo(false)); + List tagsList = List.of("tag1", "tag2"); + setField(ParseIonProcessorConfig.class, config, "tagsOnFailure", tagsList); + + assertThat(config.getTagsOnFailure(), equalTo(tagsList)); + } + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java new file mode 100644 index 0000000000..62873866d7 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.parse.ion; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; +import org.opensearch.dataprepper.plugins.processor.parse.json.ParseJsonProcessorTest; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ParseIonProcessorTest extends ParseJsonProcessorTest { + @Mock + private ParseIonProcessorConfig ionProcessorConfig; + + @BeforeEach + @Override + public void setup() { + processorConfig = ionProcessorConfig; + ParseIonProcessorConfig defaultConfig = new ParseIonProcessorConfig(); + when(processorConfig.getSource()).thenReturn(defaultConfig.getSource()); + when(processorConfig.getDestination()).thenReturn(defaultConfig.getDestination()); + when(processorConfig.getPointer()).thenReturn(defaultConfig.getPointer()); + when(processorConfig.getParseWhen()).thenReturn(null); + when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true); + } + + @Override + protected AbstractParseProcessor createObjectUnderTest() { + return new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator); + } + + @Test + void test_when_using_ion_features_then_processorParsesCorrectly() { + parseJsonProcessor = createObjectUnderTest(); + + final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }"; + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThat(parsedEvent.get("bareKey", Integer.class), equalTo(1)); + assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL")); + assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z")); + assertThat(parsedEvent.get("attribute", Double.class), equalTo(100.0)); + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java similarity index 89% rename from data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java rename to data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java index 8a08e5bd36..459fab6ea5 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java @@ -3,14 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.processor.parsejson; +package org.opensearch.dataprepper.plugins.processor.parse.json; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.opensearch.dataprepper.plugins.processor.parsejson.ParseJsonProcessorConfig.DEFAULT_SOURCE; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; import java.util.List; @@ -25,7 +24,7 @@ private ParseJsonProcessorConfig createObjectUnderTest() { public void test_when_defaultParseJsonProcessorConfig_then_returns_default_values() { final ParseJsonProcessorConfig objectUnderTest = createObjectUnderTest(); - assertThat(objectUnderTest.getSource(), equalTo(DEFAULT_SOURCE)); + assertThat(objectUnderTest.getSource(), equalTo(ParseJsonProcessorConfig.DEFAULT_SOURCE)); assertThat(objectUnderTest.getDestination(), equalTo(null)); assertThat(objectUnderTest.getPointer(), equalTo(null)); assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java similarity index 96% rename from data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java rename to data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java index 8e2ea861ab..4594cbe2f5 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.processor.parsejson; +package org.opensearch.dataprepper.plugins.processor.parse.json; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -15,6 +15,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; +import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; import java.util.ArrayList; import java.util.Collections; @@ -31,22 +33,25 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -class ParseJsonProcessorTest { +public class ParseJsonProcessorTest { private static final String DEEPLY_NESTED_KEY_NAME = "base"; + protected CommonParseConfig processorConfig; + @Mock - private ParseJsonProcessorConfig processorConfig; + private ParseJsonProcessorConfig jsonProcessorConfig; @Mock - private PluginMetrics pluginMetrics; + protected PluginMetrics pluginMetrics; @Mock - private ExpressionEvaluator expressionEvaluator; + protected ExpressionEvaluator expressionEvaluator; - private ParseJsonProcessor parseJsonProcessor; + protected AbstractParseProcessor parseJsonProcessor; @BeforeEach - void setup() { + public void setup() { + processorConfig = jsonProcessorConfig; ParseJsonProcessorConfig defaultConfig = new ParseJsonProcessorConfig(); when(processorConfig.getSource()).thenReturn(defaultConfig.getSource()); when(processorConfig.getDestination()).thenReturn(defaultConfig.getDestination()); @@ -55,8 +60,8 @@ void setup() { when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true); } - private ParseJsonProcessor createObjectUnderTest() { - return new ParseJsonProcessor(pluginMetrics, processorConfig, expressionEvaluator); + protected AbstractParseProcessor createObjectUnderTest() { + return new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator); } @Test @@ -394,7 +399,7 @@ private Object deepJsonMapHelper(final int currentLayer, final int numberOfLayer return Collections.singletonMap(key, deepJsonMapHelper(currentLayer+1, numberOfLayers)); } - private Event createAndParseMessageEvent(final String message) { + protected Event createAndParseMessageEvent(final String message) { final Record eventUnderTest = createMessageEvent(message); final List> editedEvents = (List>) parseJsonProcessor.doExecute( Collections.singletonList(eventUnderTest));