From 379c6aed6333e7a69014c183e0582eddbe63b0ba Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 8 Jan 2024 16:55:41 +0000 Subject: [PATCH 01/10] Add truncate string processor Signed-off-by: Krishna Kondaka --- .../mutatestring/TruncateStringProcessor.java | 42 +++++++ .../TruncateStringProcessorConfig.java | 61 ++++++++++ .../TruncateStringProcessorTests.java | 115 ++++++++++++++++++ 3 files changed, 218 insertions(+) create mode 100644 data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java create mode 100644 data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java create mode 100644 data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java new file mode 100644 index 0000000000..6f63a26e4e --- /dev/null +++ b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.mutatestring; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.processor.Processor; + +/** + * This processor takes in a key and changes its value to a string with the leading and trailing spaces trimmed. + * If the value is not a string, no action is performed. + */ +@DataPrepperPlugin(name = "truncate_string", pluginType = Processor.class, pluginConfigurationType = TruncateStringProcessorConfig.class) +public class TruncateStringProcessor extends AbstractStringProcessor { + private final ExpressionEvaluator expressionEvaluator; + + @DataPrepperPluginConstructor + public TruncateStringProcessor(final PluginMetrics pluginMetrics, final TruncateStringProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics, config); + this.expressionEvaluator = expressionEvaluator; + } + + @Override + protected void performKeyAction(final Event recordEvent, final TruncateStringProcessorConfig.Entry entry, final String value) { + if (entry.getTruncateWhen() != null && !expressionEvaluator.evaluateConditional(entry.getTruncateWhen(), recordEvent)) { + return; + } + recordEvent.put(entry.getSource(), value.substring(0, entry.getLength())); + } + + @Override + protected String getKey(final TruncateStringProcessorConfig.Entry entry) { + return entry.getSource(); + } +} + diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java new file mode 100644 index 0000000000..0147ff1204 --- /dev/null +++ b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.mutatestring; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + +import java.util.List; + +public class TruncateStringProcessorConfig implements StringProcessorConfig { + public static class Entry { + + @NotEmpty + @NotNull + private String source; + + @JsonProperty("length") + @NotNull + private int length; + + @JsonProperty("truncate_when") + private String truncateWhen; + + public String getSource() { + return source; + } + + public int getLength() { + return length; + } + + public String getTruncateWhen() { return truncateWhen; } + + public Entry(final String source, final int length, final String truncateWhen) { + this.source = source; + this.length = length; + this.truncateWhen = truncateWhen; + } + + public Entry() {} + } + + @Override + @JsonIgnore + public List getIterativeConfig() { + return entries; + } + + private List<@Valid Entry> entries; + + public List getEntries() { + return entries; + } +} + diff --git a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java new file mode 100644 index 0000000000..fb50ec59c7 --- /dev/null +++ b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.mutatestring; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class TruncateStringProcessorTests { + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private TruncateStringProcessorConfig config; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + private TruncateStringProcessor createObjectUnderTest() { + return new TruncateStringProcessor(pluginMetrics, config, expressionEvaluator); + } + + @ParameterizedTest + @ArgumentsSource(TruncateStringArgumentsProvider.class) + void testTruncateStringProcessor(final String message, final int truncateLength, final String truncatedMessage) { + + when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", truncateLength, null))); + + final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest(); + final Record record = createEvent(message); + final List> truncatedRecords = (List>) truncateStringProcessor.doExecute(Collections.singletonList(record)); + assertThat(truncatedRecords.get(0).getData().get("message", Object.class), notNullValue()); + assertThat(truncatedRecords.get(0).getData().get("message", Object.class), equalTo(truncatedMessage)); + } + + public void testLengthNotDefinedThrowsError() { + when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", null, null))); + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", null, null))); + + assertThrows(IllegalArgumentException.class, () -> createObjectUnderTest()); + } + + @Test + void test_event_is_the_same_when_truncateWhen_condition_returns_false() { + final String truncateWhen = UUID.randomUUID().toString(); + final String message = UUID.randomUUID().toString(); + + when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", 5, truncateWhen))); + + final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest(); + final Record record = createEvent(message); + when(expressionEvaluator.evaluateConditional(truncateWhen, record.getData())).thenReturn(false); + final List> truncatedRecords = (List>) truncateStringProcessor.doExecute(Collections.singletonList(record)); + + assertThat(truncatedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap())); + } + + + private TruncateStringProcessorConfig.Entry createEntry(final String source, final Integer length, final String truncateWhen) { + return new TruncateStringProcessorConfig.Entry(source, length, truncateWhen); + } + + private Record createEvent(final String message) { + final Map eventData = new HashMap<>(); + eventData.put("message", message); + return new Record<>(JacksonEvent.builder() + .withEventType("event") + .withData(eventData) + .build()); + } + + static class TruncateStringArgumentsProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + return Stream.of( + Arguments.arguments("hello,world,no-truncate", 100, "hello,world,no-truncate"), + Arguments.arguments("hello,world,truncate", 11, "hello,world"), + Arguments.arguments("hello,world", 1, "h"), + Arguments.arguments("hello", 0, "") + ); + } + } + +} + From 743f3475c484215229a54c6a39ed5d13fe4f0f16 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 8 Jan 2024 22:05:38 +0000 Subject: [PATCH 02/10] Addressed review comments Signed-off-by: Krishna Kondaka --- .../mutate-string-processors/README.md | 51 +++++++++++++++++++ .../mutatestring/TruncateStringProcessor.java | 9 +++- .../TruncateStringProcessorConfig.java | 21 ++++++-- .../TruncateStringProcessorTests.java | 31 +++++++---- 4 files changed, 95 insertions(+), 17 deletions(-) diff --git a/data-prepper-plugins/mutate-string-processors/README.md b/data-prepper-plugins/mutate-string-processors/README.md index 92f5f492e6..acd80dd8ec 100644 --- a/data-prepper-plugins/mutate-string-processors/README.md +++ b/data-prepper-plugins/mutate-string-processors/README.md @@ -196,6 +196,57 @@ When you run Data Prepper with this `pipeline.yaml`, you should see the followin ### Configuration * `with_keys` - (required) - A list of keys to trim the whitespace from +## TruncateStringProcessor +A processor that truncates string by removing user configured number of characters at beginning or at the end or both sides of a string. + +### Basic Usage +To get started, create the following `pipeline.yaml`. +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - trucate_string: + entries: + - source: "message" + length: 5 + sink: + - stdout: +``` + +Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. + +```json +{"message": "hello,world"} +``` +When you run Data Prepper with this `pipeline.yaml`, you should see the following output: + +```json +{"message":["hello"]} +``` + +If the above yaml file has additional config of `start_at: 2`, then the output would be following: + +```json +{"message":["llo,w"]} +``` + +If the above yaml file has additional config of `start_at: 2`, and does not have `length: 5` in the config, then the output would be following: + +```json +{"message":["llo,world"]} +``` + +### Configuration +* `entries` - (required) - A list of entries to add to an event + * `source` - (required) - The key to be modified + * `start_at` - (optional) - starting index of the string. Defaults to 0. + * `length` - (optional) - length of the string after truncation. Defaults to end of the string. +Either `start_at` or `length` or both must be present + --- ## Developer Guide diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java index 6f63a26e4e..a4ba800e5d 100644 --- a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java +++ b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java @@ -13,7 +13,8 @@ import org.opensearch.dataprepper.model.processor.Processor; /** - * This processor takes in a key and changes its value to a string with the leading and trailing spaces trimmed. + * This processor takes in a key and truncates its value to a string with + * characters from the front or at the end or at both removed. * If the value is not a string, no action is performed. */ @DataPrepperPlugin(name = "truncate_string", pluginType = Processor.class, pluginConfigurationType = TruncateStringProcessorConfig.class) @@ -31,7 +32,11 @@ protected void performKeyAction(final Event recordEvent, final TruncateStringPro if (entry.getTruncateWhen() != null && !expressionEvaluator.evaluateConditional(entry.getTruncateWhen(), recordEvent)) { return; } - recordEvent.put(entry.getSource(), value.substring(0, entry.getLength())); + int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt(); + Integer length = entry.getLength(); + String truncatedValue = (length == null || startIndex+length >= value.length()) ? value.substring(startIndex) : value.substring(startIndex, startIndex + length); + + recordEvent.put(entry.getSource(), truncatedValue); } @Override diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java index 0147ff1204..ea8f8336d2 100644 --- a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java +++ b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java @@ -10,6 +10,7 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.AssertTrue; import java.util.List; @@ -21,8 +22,10 @@ public static class Entry { private String source; @JsonProperty("length") - @NotNull - private int length; + private Integer length; + + @JsonProperty("start_at") + private Integer startAt; @JsonProperty("truncate_when") private String truncateWhen; @@ -31,14 +34,24 @@ public String getSource() { return source; } - public int getLength() { + public Integer getStartAt() { + return startAt; + } + + public Integer getLength() { return length; } + @AssertTrue(message = "At least one of start_at or length or both must be specified") + public boolean hasStartAtOrLength() { + return length != null || startAt != null; + } + public String getTruncateWhen() { return truncateWhen; } - public Entry(final String source, final int length, final String truncateWhen) { + public Entry(final String source, final Integer startAt, final Integer length, final String truncateWhen) { this.source = source; + this.startAt = startAt; this.length = length; this.truncateWhen = truncateWhen; } diff --git a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java index fb50ec59c7..f6124300c2 100644 --- a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java +++ b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import static org.junit.jupiter.params.provider.Arguments.arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; import org.mockito.Mock; @@ -51,9 +52,9 @@ private TruncateStringProcessor createObjectUnderTest() { @ParameterizedTest @ArgumentsSource(TruncateStringArgumentsProvider.class) - void testTruncateStringProcessor(final String message, final int truncateLength, final String truncatedMessage) { + void testTruncateStringProcessor(final String message, final Integer startAt, final Integer truncateLength, final String truncatedMessage) { - when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", truncateLength, null))); + when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", startAt, truncateLength, null))); final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest(); final Record record = createEvent(message); @@ -63,8 +64,8 @@ void testTruncateStringProcessor(final String message, final int truncateLength, } public void testLengthNotDefinedThrowsError() { - when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", null, null))); - when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", null, null))); + when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", null, null, null))); + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", null, null, null))); assertThrows(IllegalArgumentException.class, () -> createObjectUnderTest()); } @@ -74,7 +75,7 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() { final String truncateWhen = UUID.randomUUID().toString(); final String message = UUID.randomUUID().toString(); - when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", 5, truncateWhen))); + when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", null, 5, truncateWhen))); final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest(); final Record record = createEvent(message); @@ -85,8 +86,8 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() { } - private TruncateStringProcessorConfig.Entry createEntry(final String source, final Integer length, final String truncateWhen) { - return new TruncateStringProcessorConfig.Entry(source, length, truncateWhen); + private TruncateStringProcessorConfig.Entry createEntry(final String source, final Integer startAt, final Integer length, final String truncateWhen) { + return new TruncateStringProcessorConfig.Entry(source, startAt, length, truncateWhen); } private Record createEvent(final String message) { @@ -103,10 +104,18 @@ static class TruncateStringArgumentsProvider implements ArgumentsProvider { @Override public Stream provideArguments(ExtensionContext context) { return Stream.of( - Arguments.arguments("hello,world,no-truncate", 100, "hello,world,no-truncate"), - Arguments.arguments("hello,world,truncate", 11, "hello,world"), - Arguments.arguments("hello,world", 1, "h"), - Arguments.arguments("hello", 0, "") + arguments("hello,world,no-truncate", 0, 100, "hello,world,no-truncate"), + arguments("hello,world,no-truncate", 6, 100, "world,no-truncate"), + arguments("hello,world,no-truncate", 6, 16, "world,no-truncat"), + arguments("hello,world,no-truncate", 6, 17, "world,no-truncate"), + arguments("hello,world,no-truncate", 6, 18, "world,no-truncate"), + arguments("hello,world,no-truncate", 6, 5, "world"), + arguments("hello,world,no-truncate", 6, null, "world,no-truncate"), + + arguments("hello,world,no-truncate", null, 100, "hello,world,no-truncate"), + arguments("hello,world,truncate", null, 11, "hello,world"), + arguments("hello,world", null, 1, "h"), + arguments("hello", null, 0, "") ); } } From 49aa0eb3838bdf90a33474d4b621259584a360ad Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 8 Jan 2024 23:03:46 +0000 Subject: [PATCH 03/10] Added check for negative numbers in the config input Signed-off-by: Krishna Kondaka --- .../mutatestring/TruncateStringProcessorConfig.java | 13 +++++++++++-- .../mutatestring/TruncateStringProcessorTests.java | 10 ++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java index ea8f8336d2..5c0239d700 100644 --- a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java +++ b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java @@ -42,9 +42,18 @@ public Integer getLength() { return length; } - @AssertTrue(message = "At least one of start_at or length or both must be specified") + @AssertTrue(message = "At least one of start_at or length or both must be specified and the values must be positive integers") public boolean hasStartAtOrLength() { - return length != null || startAt != null; + if (length == null && startAt == null) { + return false; + } + if (length != null && length < 0) { + return false; + } + if (startAt != null && startAt < 0) { + return false; + } + return true; } public String getTruncateWhen() { return truncateWhen; } diff --git a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java index f6124300c2..9baa329e93 100644 --- a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java +++ b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java @@ -63,11 +63,13 @@ void testTruncateStringProcessor(final String message, final Integer startAt, fi assertThat(truncatedRecords.get(0).getData().get("message", Object.class), equalTo(truncatedMessage)); } - public void testLengthNotDefinedThrowsError() { - when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", null, null, null))); - when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", null, null, null))); + @Test + public void testInputValidation() { + assertThat(createEntry("message", null, null, null).hasStartAtOrLength(), equalTo(false)); + assertThat(createEntry("message", null, -5, null).hasStartAtOrLength(), equalTo(false)); + assertThat(createEntry("message", -5, null, null).hasStartAtOrLength(), equalTo(false)); + assertThat(createEntry("message", -5, -6, null).hasStartAtOrLength(), equalTo(false)); - assertThrows(IllegalArgumentException.class, () -> createObjectUnderTest()); } @Test From f73f5538947ae80add3d7e7c93948418c01a1c79 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 9 Jan 2024 04:09:44 +0000 Subject: [PATCH 04/10] Fixed checkstyle error Signed-off-by: Krishna Kondaka --- .../processor/mutatestring/TruncateStringProcessorTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java index 9baa329e93..351cf0451c 100644 --- a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java +++ b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java @@ -31,7 +31,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) From 96c2568db847f104a6cfde3d845d7a099e4853b6 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 9 Jan 2024 22:17:45 +0000 Subject: [PATCH 05/10] Modified to make truncate processor a top level processor, not specific to strings Signed-off-by: Krishna Kondaka --- .../mutate-string-processors/README.md | 51 ---------- .../mutatestring/TruncateStringProcessor.java | 47 ---------- .../truncate-processor/README.md | 92 ++++++++++++++++++ .../truncate-processor/build.gradle | 17 ++++ .../truncate/TruncateStringProcessor.java | 94 +++++++++++++++++++ .../TruncateStringProcessorConfig.java | 11 +-- .../TruncateStringProcessorTests.java | 16 ++-- settings.gradle | 1 + 8 files changed, 215 insertions(+), 114 deletions(-) delete mode 100644 data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java create mode 100644 data-prepper-plugins/truncate-processor/README.md create mode 100644 data-prepper-plugins/truncate-processor/build.gradle create mode 100644 data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessor.java rename data-prepper-plugins/{mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring => truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate}/TruncateStringProcessorConfig.java (84%) rename data-prepper-plugins/{mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring => truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate}/TruncateStringProcessorTests.java (85%) diff --git a/data-prepper-plugins/mutate-string-processors/README.md b/data-prepper-plugins/mutate-string-processors/README.md index acd80dd8ec..92f5f492e6 100644 --- a/data-prepper-plugins/mutate-string-processors/README.md +++ b/data-prepper-plugins/mutate-string-processors/README.md @@ -196,57 +196,6 @@ When you run Data Prepper with this `pipeline.yaml`, you should see the followin ### Configuration * `with_keys` - (required) - A list of keys to trim the whitespace from -## TruncateStringProcessor -A processor that truncates string by removing user configured number of characters at beginning or at the end or both sides of a string. - -### Basic Usage -To get started, create the following `pipeline.yaml`. -```yaml -pipeline: - source: - file: - path: "/full/path/to/logs_json.log" - record_type: "event" - format: "json" - processor: - - trucate_string: - entries: - - source: "message" - length: 5 - sink: - - stdout: -``` - -Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. - -```json -{"message": "hello,world"} -``` -When you run Data Prepper with this `pipeline.yaml`, you should see the following output: - -```json -{"message":["hello"]} -``` - -If the above yaml file has additional config of `start_at: 2`, then the output would be following: - -```json -{"message":["llo,w"]} -``` - -If the above yaml file has additional config of `start_at: 2`, and does not have `length: 5` in the config, then the output would be following: - -```json -{"message":["llo,world"]} -``` - -### Configuration -* `entries` - (required) - A list of entries to add to an event - * `source` - (required) - The key to be modified - * `start_at` - (optional) - starting index of the string. Defaults to 0. - * `length` - (optional) - length of the string after truncation. Defaults to end of the string. -Either `start_at` or `length` or both must be present - --- ## Developer Guide diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java deleted file mode 100644 index a4ba800e5d..0000000000 --- a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessor.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.processor.mutatestring; - -import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.processor.Processor; - -/** - * This processor takes in a key and truncates its value to a string with - * characters from the front or at the end or at both removed. - * If the value is not a string, no action is performed. - */ -@DataPrepperPlugin(name = "truncate_string", pluginType = Processor.class, pluginConfigurationType = TruncateStringProcessorConfig.class) -public class TruncateStringProcessor extends AbstractStringProcessor { - private final ExpressionEvaluator expressionEvaluator; - - @DataPrepperPluginConstructor - public TruncateStringProcessor(final PluginMetrics pluginMetrics, final TruncateStringProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { - super(pluginMetrics, config); - this.expressionEvaluator = expressionEvaluator; - } - - @Override - protected void performKeyAction(final Event recordEvent, final TruncateStringProcessorConfig.Entry entry, final String value) { - if (entry.getTruncateWhen() != null && !expressionEvaluator.evaluateConditional(entry.getTruncateWhen(), recordEvent)) { - return; - } - int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt(); - Integer length = entry.getLength(); - String truncatedValue = (length == null || startIndex+length >= value.length()) ? value.substring(startIndex) : value.substring(startIndex, startIndex + length); - - recordEvent.put(entry.getSource(), truncatedValue); - } - - @Override - protected String getKey(final TruncateStringProcessorConfig.Entry entry) { - return entry.getSource(); - } -} - diff --git a/data-prepper-plugins/truncate-processor/README.md b/data-prepper-plugins/truncate-processor/README.md new file mode 100644 index 0000000000..835f88f757 --- /dev/null +++ b/data-prepper-plugins/truncate-processor/README.md @@ -0,0 +1,92 @@ +# Truncate Processor + +This is a processor that truncates key's value at the beginning or at the end or at both sides of a string as per the configuration. If the key's value is a list, then each of the string members of the list are truncated. Non-string members of the list are left untouched. If `truncate_when` option is provided, the truncation of the input is done only when the condition specified is true for the event being processed. + +## Basic Usage +To get started, create the following `pipeline.yaml`. +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - trucate_string: + entries: + - source: "message" + length: 5 + sink: + - stdout: +``` + +Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. + +```json +{"message": "hello,world"} +``` +When you run Data Prepper with this `pipeline.yaml`, you should see the following output: + +```json +{"message":["hello"]} +``` + +If the above yaml file has additional config of `start_at: 2`, then the output would be following: + +```json +{"message":["llo,w"]} +``` + +If the above yaml file has additional config of `start_at: 2`, and does not have `length: 5` in the config, then the output would be following: + +```json +{"message":["llo,world"]} +``` + +If the source has an list of strings, then the result will be an array of strings where each of the member of the list is truncated. The following input +```json +{"message": ["hello_one", "hello_two", "hello_three"]} +``` +is transformed to the following: + +```json +{"message": ["hello", "hello", "hello"]} +``` + +Example configuration with `truncate_when` option: +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - trucate_string: + entries: + - source: "message" + length: 5 + start_at: 7 + truncate_when: '/id == 1' + sink: + - stdout: +``` + +When the pipeline started with the above configuration receives the following two events +```json +{"message": "hello, world", "id": 1} +{"message": "hello, world,not-truncated", "id": 2} +``` +the output would be +```json +{"message": "world", "id": 1} +{"message": "hello, world,not-truncated", "id": 2} +``` + +### Configuration +* `entries` - (required) - A list of entries to add to an event + * `source` - (required) - The key to be modified + * `start_at` - (optional) - starting index of the string. Defaults to 0. + * `length` - (optional) - length of the string after truncation. Defaults to end of the string. +Either `start_at` or `length` or both must be present + diff --git a/data-prepper-plugins/truncate-processor/build.gradle b/data-prepper-plugins/truncate-processor/build.gradle new file mode 100644 index 0000000000..59ecee5524 --- /dev/null +++ b/data-prepper-plugins/truncate-processor/build.gradle @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-test-common') + implementation project(':data-prepper-plugins:common') + implementation 'com.fasterxml.jackson.core:jackson-databind' + testImplementation libs.commons.lang3 +} + diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessor.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessor.java new file mode 100644 index 0000000000..a83ff50a14 --- /dev/null +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessor.java @@ -0,0 +1,94 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.truncate; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; + +import java.util.Collection; +import java.util.ArrayList; +import java.util.List; + +/** + * This processor takes in a key and truncates its value to a string with + * characters from the front or at the end or at both removed. + * If the value is not a string, no action is performed. + */ +@DataPrepperPlugin(name = "truncate", pluginType = Processor.class, pluginConfigurationType = TruncateStringProcessorConfig.class) +public class TruncateStringProcessor extends AbstractProcessor, Record>{ + private final List entries; + private final ExpressionEvaluator expressionEvaluator; + + @DataPrepperPluginConstructor + public TruncateStringProcessor(final PluginMetrics pluginMetrics, final TruncateStringProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics); + this.entries = config.getEntries(); + this.expressionEvaluator = expressionEvaluator; + } + + private String getTruncatedValue(final TruncateStringProcessorConfig.Entry entry, final String value) { + int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt(); + Integer length = entry.getLength(); + String truncatedValue = (length == null || startIndex+length >= value.length()) ? value.substring(startIndex) : value.substring(startIndex, startIndex + length); + + return truncatedValue; + } + + @Override + public Collection> doExecute(final Collection> records) { + for(final Record record : records) { + final Event recordEvent = record.getData(); + for(TruncateStringProcessorConfig.Entry entry : entries) { + if (entry.getTruncateWhen() != null && !expressionEvaluator.evaluateConditional(entry.getTruncateWhen(), recordEvent)) { + continue; + } + final String key = entry.getSource(); + if (!recordEvent.containsKey(key)) { + continue; + } + + final Object value = recordEvent.get(key, Object.class); + if (value instanceof String) { + recordEvent.put(key, getTruncatedValue(entry, (String)value)); + } else if (value instanceof List) { + List result = new ArrayList<>(); + for (Object arrayObject: (List)value) { + if (arrayObject instanceof String) { + result.add(getTruncatedValue(entry, (String)arrayObject)); + } else { + result.add(arrayObject); + } + } + recordEvent.put(key, result); + } + } + } + + return records; + } + + @Override + public void prepareForShutdown() { + + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + + } +} + diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorConfig.java similarity index 84% rename from data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java rename to data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorConfig.java index 5c0239d700..62aa194ca5 100644 --- a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorConfig.java +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorConfig.java @@ -3,9 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.processor.mutatestring; +package org.opensearch.dataprepper.plugins.processor.truncate; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; @@ -14,7 +13,7 @@ import java.util.List; -public class TruncateStringProcessorConfig implements StringProcessorConfig { +public class TruncateStringProcessorConfig { public static class Entry { @NotEmpty @@ -68,12 +67,6 @@ public Entry(final String source, final Integer startAt, final Integer length, f public Entry() {} } - @Override - @JsonIgnore - public List getIterativeConfig() { - return entries; - } - private List<@Valid Entry> entries; public List getEntries() { diff --git a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorTests.java similarity index 85% rename from data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java rename to data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorTests.java index 351cf0451c..3380e7baed 100644 --- a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/TruncateStringProcessorTests.java +++ b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorTests.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.processor.mutatestring; +package org.opensearch.dataprepper.plugins.processor.truncate; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -51,12 +51,12 @@ private TruncateStringProcessor createObjectUnderTest() { @ParameterizedTest @ArgumentsSource(TruncateStringArgumentsProvider.class) - void testTruncateStringProcessor(final String message, final Integer startAt, final Integer truncateLength, final String truncatedMessage) { + void testTruncateStringProcessor(final Object messageValue, final Integer startAt, final Integer truncateLength, final Object truncatedMessage) { - when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", startAt, truncateLength, null))); + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", startAt, truncateLength, null))); final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest(); - final Record record = createEvent(message); + final Record record = createEvent(messageValue); final List> truncatedRecords = (List>) truncateStringProcessor.doExecute(Collections.singletonList(record)); assertThat(truncatedRecords.get(0).getData().get("message", Object.class), notNullValue()); assertThat(truncatedRecords.get(0).getData().get("message", Object.class), equalTo(truncatedMessage)); @@ -76,7 +76,7 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() { final String truncateWhen = UUID.randomUUID().toString(); final String message = UUID.randomUUID().toString(); - when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", null, 5, truncateWhen))); + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", null, 5, truncateWhen))); final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest(); final Record record = createEvent(message); @@ -91,7 +91,7 @@ private TruncateStringProcessorConfig.Entry createEntry(final String source, fin return new TruncateStringProcessorConfig.Entry(source, startAt, length, truncateWhen); } - private Record createEvent(final String message) { + private Record createEvent(final Object message) { final Map eventData = new HashMap<>(); eventData.put("message", message); return new Record<>(JacksonEvent.builder() @@ -116,7 +116,9 @@ public Stream provideArguments(ExtensionContext context) { arguments("hello,world,no-truncate", null, 100, "hello,world,no-truncate"), arguments("hello,world,truncate", null, 11, "hello,world"), arguments("hello,world", null, 1, "h"), - arguments("hello", null, 0, "") + arguments("hello", null, 0, ""), + arguments(List.of("hello_one", "hello_two", "hello_three"), null, 5, List.of("hello", "hello", "hello")), + arguments(List.of("hello_one", 2, "hello_three"), null, 5, List.of("hello", 2, "hello")) ); } } diff --git a/settings.gradle b/settings.gradle index 6432826bda..2d406ee35c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -120,6 +120,7 @@ include 'data-prepper-plugins:csv-processor' include 'data-prepper-plugins:parse-json-processor' include 'data-prepper-plugins:trace-peer-forwarder-processor' include 'data-prepper-plugins:translate-processor' +include 'data-prepper-plugins:truncate-processor' include 'data-prepper-plugins:dynamodb-source-coordination-store' include 'release' include 'release:archives' From e935916c5b485091ccb349b35ab2d095c8b7a506 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 10 Jan 2024 00:10:33 +0000 Subject: [PATCH 06/10] Addressed review comments Signed-off-by: Krishna Kondaka --- .../truncate-processor/README.md | 1 - .../processor/truncate/TruncateProcessor.java | 98 ++++++++++++++ .../truncate/TruncateProcessorConfig.java | 55 ++++++++ .../truncate/TruncateStringProcessor.java | 94 -------------- .../TruncateStringProcessorConfig.java | 76 ----------- .../TruncateProcessorConfigTests.java | 98 ++++++++++++++ .../truncate/TruncateProcessorTests.java | 120 ++++++++++++++++++ 7 files changed, 371 insertions(+), 171 deletions(-) create mode 100644 data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java create mode 100644 data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java delete mode 100644 data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessor.java delete mode 100644 data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorConfig.java create mode 100644 data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java create mode 100644 data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java diff --git a/data-prepper-plugins/truncate-processor/README.md b/data-prepper-plugins/truncate-processor/README.md index 835f88f757..e6a8e7b7d1 100644 --- a/data-prepper-plugins/truncate-processor/README.md +++ b/data-prepper-plugins/truncate-processor/README.md @@ -89,4 +89,3 @@ the output would be * `start_at` - (optional) - starting index of the string. Defaults to 0. * `length` - (optional) - length of the string after truncation. Defaults to end of the string. Either `start_at` or `length` or both must be present - diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java new file mode 100644 index 0000000000..b93bf517c4 --- /dev/null +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.truncate; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; + +import java.util.Collection; +import java.util.ArrayList; +import java.util.List; + +/** + * This processor takes in a key and truncates its value to a string with + * characters from the front or at the end or at both removed. + * If the value is not a string, no action is performed. + */ +@DataPrepperPlugin(name = "truncate", pluginType = Processor.class, pluginConfigurationType = TruncateProcessorConfig.class) +public class TruncateProcessor extends AbstractProcessor, Record>{ + private final ExpressionEvaluator expressionEvaluator; + private final String truncateWhen; + private final int startIndex; + private final Integer length; + private final String source; + + @DataPrepperPluginConstructor + public TruncateProcessor(final PluginMetrics pluginMetrics, final TruncateProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics); + this.expressionEvaluator = expressionEvaluator; + this.truncateWhen = config.getTruncateWhen(); + this.source = config.getSource(); + this.startIndex = config.getStartAt() == null ? 0 : config.getStartAt(); + this.length = config.getLength(); + } + + private String getTruncatedValue(final String value) { + String truncatedValue = + (length == null || startIndex+length >= value.length()) ? + value.substring(startIndex) : + value.substring(startIndex, startIndex + length); + + return truncatedValue; + } + + @Override + public Collection> doExecute(final Collection> records) { + for(final Record record : records) { + final Event recordEvent = record.getData(); + if (truncateWhen != null && !expressionEvaluator.evaluateConditional(truncateWhen, recordEvent)) { + continue; + } + if (!recordEvent.containsKey(source)) { + continue; + } + + final Object value = recordEvent.get(source, Object.class); + if (value instanceof String) { + recordEvent.put(source, getTruncatedValue((String)value)); + } else if (value instanceof List) { + List result = new ArrayList<>(); + for (Object listItem: (List)value) { + if (listItem instanceof String) { + result.add(getTruncatedValue((String)listItem)); + } else { + result.add(listItem); + } + } + recordEvent.put(source, result); + } + } + + return records; + } + + @Override + public void prepareForShutdown() { + + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + + } +} + diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java new file mode 100644 index 0000000000..29f63e54fe --- /dev/null +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.truncate; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.AssertTrue; + +public class TruncateProcessorConfig { + @NotEmpty + @NotNull + private String source; + + @JsonProperty("length") + private Integer length; + + @JsonProperty("start_at") + private Integer startAt; + + @JsonProperty("truncate_when") + private String truncateWhen; + + public String getSource() { + return source; + } + + public Integer getStartAt() { + return startAt; + } + + public Integer getLength() { + return length; + } + + @AssertTrue(message = "At least one of start_at or length or both must be specified and the values must be positive integers") + public boolean isValidConfig() { + if (length == null && startAt == null) { + return false; + } + if (length != null && length < 0) { + return false; + } + if (startAt != null && startAt < 0) { + return false; + } + return true; + } + + public String getTruncateWhen() { return truncateWhen; } +} + diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessor.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessor.java deleted file mode 100644 index a83ff50a14..0000000000 --- a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessor.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.processor.truncate; - -import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.processor.AbstractProcessor; -import org.opensearch.dataprepper.model.processor.Processor; - -import java.util.Collection; -import java.util.ArrayList; -import java.util.List; - -/** - * This processor takes in a key and truncates its value to a string with - * characters from the front or at the end or at both removed. - * If the value is not a string, no action is performed. - */ -@DataPrepperPlugin(name = "truncate", pluginType = Processor.class, pluginConfigurationType = TruncateStringProcessorConfig.class) -public class TruncateStringProcessor extends AbstractProcessor, Record>{ - private final List entries; - private final ExpressionEvaluator expressionEvaluator; - - @DataPrepperPluginConstructor - public TruncateStringProcessor(final PluginMetrics pluginMetrics, final TruncateStringProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { - super(pluginMetrics); - this.entries = config.getEntries(); - this.expressionEvaluator = expressionEvaluator; - } - - private String getTruncatedValue(final TruncateStringProcessorConfig.Entry entry, final String value) { - int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt(); - Integer length = entry.getLength(); - String truncatedValue = (length == null || startIndex+length >= value.length()) ? value.substring(startIndex) : value.substring(startIndex, startIndex + length); - - return truncatedValue; - } - - @Override - public Collection> doExecute(final Collection> records) { - for(final Record record : records) { - final Event recordEvent = record.getData(); - for(TruncateStringProcessorConfig.Entry entry : entries) { - if (entry.getTruncateWhen() != null && !expressionEvaluator.evaluateConditional(entry.getTruncateWhen(), recordEvent)) { - continue; - } - final String key = entry.getSource(); - if (!recordEvent.containsKey(key)) { - continue; - } - - final Object value = recordEvent.get(key, Object.class); - if (value instanceof String) { - recordEvent.put(key, getTruncatedValue(entry, (String)value)); - } else if (value instanceof List) { - List result = new ArrayList<>(); - for (Object arrayObject: (List)value) { - if (arrayObject instanceof String) { - result.add(getTruncatedValue(entry, (String)arrayObject)); - } else { - result.add(arrayObject); - } - } - recordEvent.put(key, result); - } - } - } - - return records; - } - - @Override - public void prepareForShutdown() { - - } - - @Override - public boolean isReadyForShutdown() { - return true; - } - - @Override - public void shutdown() { - - } -} - diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorConfig.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorConfig.java deleted file mode 100644 index 62aa194ca5..0000000000 --- a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorConfig.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.processor.truncate; - -import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.validation.Valid; -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; -import jakarta.validation.constraints.AssertTrue; - -import java.util.List; - -public class TruncateStringProcessorConfig { - public static class Entry { - - @NotEmpty - @NotNull - private String source; - - @JsonProperty("length") - private Integer length; - - @JsonProperty("start_at") - private Integer startAt; - - @JsonProperty("truncate_when") - private String truncateWhen; - - public String getSource() { - return source; - } - - public Integer getStartAt() { - return startAt; - } - - public Integer getLength() { - return length; - } - - @AssertTrue(message = "At least one of start_at or length or both must be specified and the values must be positive integers") - public boolean hasStartAtOrLength() { - if (length == null && startAt == null) { - return false; - } - if (length != null && length < 0) { - return false; - } - if (startAt != null && startAt < 0) { - return false; - } - return true; - } - - public String getTruncateWhen() { return truncateWhen; } - - public Entry(final String source, final Integer startAt, final Integer length, final String truncateWhen) { - this.source = source; - this.startAt = startAt; - this.length = length; - this.truncateWhen = truncateWhen; - } - - public Entry() {} - } - - private List<@Valid Entry> entries; - - public List getEntries() { - return entries; - } -} - diff --git a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java new file mode 100644 index 0000000000..5643876ace --- /dev/null +++ b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.truncate; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import org.apache.commons.lang3.RandomStringUtils; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; + +import java.util.Random; + +class TruncateProcessorConfigTests { + TruncateProcessorConfig truncateProcessorConfig; + + Random random; + + @BeforeEach + void setUp() { + truncateProcessorConfig = new TruncateProcessorConfig(); + random = new Random(); + } + + @Test + void testDefaults() { + assertThat(truncateProcessorConfig.getSource(), equalTo(null)); + assertThat(truncateProcessorConfig.getStartAt(), equalTo(null)); + assertThat(truncateProcessorConfig.getLength(), equalTo(null)); + assertThat(truncateProcessorConfig.getTruncateWhen(), equalTo(null)); + } + + @Test + void testValidConfiguration_withStartAt() throws NoSuchFieldException, IllegalAccessException { + String source = RandomStringUtils.randomAlphabetic(10); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "source", source); + int startAt = random.nextInt(100); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", startAt); + assertThat(truncateProcessorConfig.getSource(), equalTo(source)); + assertThat(truncateProcessorConfig.getStartAt(), equalTo(startAt)); + assertTrue(truncateProcessorConfig.isValidConfig()); + } + + @Test + void testValidConfiguration_withLength() throws NoSuchFieldException, IllegalAccessException { + String source = RandomStringUtils.randomAlphabetic(10); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "source", source); + int length = random.nextInt(100); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "length", length); + assertThat(truncateProcessorConfig.getSource(), equalTo(source)); + assertThat(truncateProcessorConfig.getLength(), equalTo(length)); + assertTrue(truncateProcessorConfig.isValidConfig()); + } + + @Test + void testValidConfiguration_withLength_withTruncateWhen() throws NoSuchFieldException, IllegalAccessException { + String source = RandomStringUtils.randomAlphabetic(10); + String condition = RandomStringUtils.randomAlphabetic(10); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "source", source); + int length = random.nextInt(100); + int startAt = random.nextInt(100); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "length", length); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", startAt); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "truncateWhen", condition); + assertThat(truncateProcessorConfig.getSource(), equalTo(source)); + assertThat(truncateProcessorConfig.getLength(), equalTo(length)); + assertThat(truncateProcessorConfig.getStartAt(), equalTo(startAt)); + assertThat(truncateProcessorConfig.getTruncateWhen(), equalTo(condition)); + assertTrue(truncateProcessorConfig.isValidConfig()); + } + + @Test + void testInvalidConfiguration_StartAt_Length_BothNull() throws NoSuchFieldException, IllegalAccessException { + String source = RandomStringUtils.randomAlphabetic(10); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "source", source); + assertFalse(truncateProcessorConfig.isValidConfig()); + } + + @Test + void testInvalidConfiguration_StartAt_Length_Negative() throws NoSuchFieldException, IllegalAccessException { + String source = RandomStringUtils.randomAlphabetic(10); + int length = random.nextInt(100); + int startAt = random.nextInt(100); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "source", source); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", -startAt); + assertFalse(truncateProcessorConfig.isValidConfig()); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", startAt); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "length", -length); + assertFalse(truncateProcessorConfig.isValidConfig()); + } +} diff --git a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java new file mode 100644 index 0000000000..d3f2a22eca --- /dev/null +++ b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.truncate; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class TruncateProcessorTests { + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private TruncateProcessorConfig config; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + private TruncateProcessor createObjectUnderTest() { + return new TruncateProcessor(pluginMetrics, config, expressionEvaluator); + } + + @ParameterizedTest + @ArgumentsSource(TruncateArgumentsProvider.class) + void testTruncateProcessor(final Object messageValue, final Integer startAt, final Integer truncateLength, final Object truncatedMessage) { + + when(config.getSource()).thenReturn("message"); + when(config.getStartAt()).thenReturn(startAt); + when(config.getLength()).thenReturn(truncateLength); + when(config.getTruncateWhen()).thenReturn(null); + + final TruncateProcessor truncateProcessor = createObjectUnderTest(); + final Record record = createEvent(messageValue); + final List> truncatedRecords = (List>) truncateProcessor.doExecute(Collections.singletonList(record)); + assertThat(truncatedRecords.get(0).getData().get("message", Object.class), notNullValue()); + assertThat(truncatedRecords.get(0).getData().get("message", Object.class), equalTo(truncatedMessage)); + } + + @Test + void test_event_is_the_same_when_truncateWhen_condition_returns_false() { + final String truncateWhen = UUID.randomUUID().toString(); + final String message = UUID.randomUUID().toString(); + + when(config.getSource()).thenReturn("message"); + when(config.getStartAt()).thenReturn(null); + when(config.getLength()).thenReturn(5); + when(config.getTruncateWhen()).thenReturn(truncateWhen); + + final TruncateProcessor truncateProcessor = createObjectUnderTest(); + final Record record = createEvent(message); + when(expressionEvaluator.evaluateConditional(truncateWhen, record.getData())).thenReturn(false); + final List> truncatedRecords = (List>) truncateProcessor.doExecute(Collections.singletonList(record)); + + assertThat(truncatedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap())); + } + + + private Record createEvent(final Object message) { + final Map eventData = new HashMap<>(); + eventData.put("message", message); + return new Record<>(JacksonEvent.builder() + .withEventType("event") + .withData(eventData) + .build()); + } + + static class TruncateArgumentsProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + return Stream.of( + arguments("hello,world,no-truncate", 0, 100, "hello,world,no-truncate"), + arguments("hello,world,no-truncate", 6, 100, "world,no-truncate"), + arguments("hello,world,no-truncate", 6, 16, "world,no-truncat"), + arguments("hello,world,no-truncate", 6, 17, "world,no-truncate"), + arguments("hello,world,no-truncate", 6, 18, "world,no-truncate"), + arguments("hello,world,no-truncate", 6, 5, "world"), + arguments("hello,world,no-truncate", 6, null, "world,no-truncate"), + + arguments("hello,world,no-truncate", null, 100, "hello,world,no-truncate"), + arguments("hello,world,truncate", null, 11, "hello,world"), + arguments("hello,world", null, 1, "h"), + arguments("hello", null, 0, ""), + arguments(List.of("hello_one", "hello_two", "hello_three"), null, 5, List.of("hello", "hello", "hello")), + arguments(List.of("hello_one", 2, "hello_three"), null, 5, List.of("hello", 2, "hello")) + ); + } + } + +} + From d2f107b8a8b3e295c97f76971c192ea1c5418484 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 10 Jan 2024 00:38:37 +0000 Subject: [PATCH 07/10] Updated documentation with correct configuration Signed-off-by: Krishna Kondaka --- .../truncate-processor/README.md | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/data-prepper-plugins/truncate-processor/README.md b/data-prepper-plugins/truncate-processor/README.md index e6a8e7b7d1..f2c1d9c4d8 100644 --- a/data-prepper-plugins/truncate-processor/README.md +++ b/data-prepper-plugins/truncate-processor/README.md @@ -62,12 +62,11 @@ pipeline: record_type: "event" format: "json" processor: - - trucate_string: - entries: - - source: "message" - length: 5 - start_at: 7 - truncate_when: '/id == 1' + - trucate: + source: "message" + length: 5 + start_at: 7 + truncate_when: '/id == 1' sink: - stdout: ``` @@ -84,8 +83,8 @@ the output would be ``` ### Configuration -* `entries` - (required) - A list of entries to add to an event - * `source` - (required) - The key to be modified - * `start_at` - (optional) - starting index of the string. Defaults to 0. - * `length` - (optional) - length of the string after truncation. Defaults to end of the string. +* `source` - (required) - The key to be modified +* `truncate_when` - (optional) - a condition, when it is true the truncate operation is performed. +* `start_at` - (optional) - starting index of the string. Defaults to 0. +* `length` - (optional) - length of the string after truncation. Defaults to end of the string. Either `start_at` or `length` or both must be present From 3ba6e2fe04faf1872147b5358782ec989a7e6b22 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 10 Jan 2024 01:26:53 +0000 Subject: [PATCH 08/10] Fixed typos in the documentation Signed-off-by: Krishna Kondaka --- data-prepper-plugins/truncate-processor/README.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/truncate-processor/README.md b/data-prepper-plugins/truncate-processor/README.md index f2c1d9c4d8..76a34dc431 100644 --- a/data-prepper-plugins/truncate-processor/README.md +++ b/data-prepper-plugins/truncate-processor/README.md @@ -12,10 +12,9 @@ pipeline: record_type: "event" format: "json" processor: - - trucate_string: - entries: - - source: "message" - length: 5 + - truncate: + source: "message" + length: 5 sink: - stdout: ``` @@ -62,7 +61,7 @@ pipeline: record_type: "event" format: "json" processor: - - trucate: + - truncate: source: "message" length: 5 start_at: 7 From 78d5cf600159e9c5104b865e90564ecf67988671 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 10 Jan 2024 20:57:18 +0000 Subject: [PATCH 09/10] Modified to allow more than one source keys in the config Signed-off-by: Krishna Kondaka --- .../processor/truncate/TruncateProcessor.java | 34 ++--- .../truncate/TruncateProcessorConfig.java | 11 +- .../TruncateProcessorConfigTests.java | 25 ++-- .../truncate/TruncateProcessorTests.java | 4 +- .../TruncateStringProcessorTests.java | 127 ------------------ 5 files changed, 42 insertions(+), 159 deletions(-) delete mode 100644 data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorTests.java diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java index b93bf517c4..00bcd04e89 100644 --- a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java @@ -29,14 +29,14 @@ public class TruncateProcessor extends AbstractProcessor, Record sourceKeys; @DataPrepperPluginConstructor public TruncateProcessor(final PluginMetrics pluginMetrics, final TruncateProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); this.expressionEvaluator = expressionEvaluator; this.truncateWhen = config.getTruncateWhen(); - this.source = config.getSource(); + this.sourceKeys = config.getSourceKeys(); this.startIndex = config.getStartAt() == null ? 0 : config.getStartAt(); this.length = config.getLength(); } @@ -57,23 +57,25 @@ public Collection> doExecute(final Collection> recor if (truncateWhen != null && !expressionEvaluator.evaluateConditional(truncateWhen, recordEvent)) { continue; } - if (!recordEvent.containsKey(source)) { - continue; - } + for (String sourceKey: sourceKeys) { + if (!recordEvent.containsKey(sourceKey)) { + continue; + } - final Object value = recordEvent.get(source, Object.class); - if (value instanceof String) { - recordEvent.put(source, getTruncatedValue((String)value)); - } else if (value instanceof List) { - List result = new ArrayList<>(); - for (Object listItem: (List)value) { - if (listItem instanceof String) { - result.add(getTruncatedValue((String)listItem)); - } else { - result.add(listItem); + final Object value = recordEvent.get(sourceKey, Object.class); + if (value instanceof String) { + recordEvent.put(sourceKey, getTruncatedValue((String)value)); + } else if (value instanceof List) { + List result = new ArrayList<>(); + for (Object listItem: (List)value) { + if (listItem instanceof String) { + result.add(getTruncatedValue((String)listItem)); + } else { + result.add(listItem); + } } + recordEvent.put(sourceKey, result); } - recordEvent.put(source, result); } } diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java index 29f63e54fe..943cd4c1ba 100644 --- a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java @@ -10,10 +10,13 @@ import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.AssertTrue; +import java.util.List; + public class TruncateProcessorConfig { @NotEmpty @NotNull - private String source; + @JsonProperty("source_keys") + private List sourceKeys; @JsonProperty("length") private Integer length; @@ -24,8 +27,8 @@ public class TruncateProcessorConfig { @JsonProperty("truncate_when") private String truncateWhen; - public String getSource() { - return source; + public List getSourceKeys() { + return sourceKeys; } public Integer getStartAt() { @@ -36,7 +39,7 @@ public Integer getLength() { return length; } - @AssertTrue(message = "At least one of start_at or length or both must be specified and the values must be positive integers") + @AssertTrue(message = "At least one of source or source_keys must be specified. At least one of start_at or length or both must be specified and the values must be positive integers") public boolean isValidConfig() { if (length == null && startAt == null) { return false; diff --git a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java index 5643876ace..e910f618ee 100644 --- a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java +++ b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java @@ -16,6 +16,7 @@ import org.apache.commons.lang3.RandomStringUtils; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; +import java.util.List; import java.util.Random; class TruncateProcessorConfigTests { @@ -31,7 +32,7 @@ void setUp() { @Test void testDefaults() { - assertThat(truncateProcessorConfig.getSource(), equalTo(null)); + assertThat(truncateProcessorConfig.getSourceKeys(), equalTo(null)); assertThat(truncateProcessorConfig.getStartAt(), equalTo(null)); assertThat(truncateProcessorConfig.getLength(), equalTo(null)); assertThat(truncateProcessorConfig.getTruncateWhen(), equalTo(null)); @@ -40,21 +41,24 @@ void testDefaults() { @Test void testValidConfiguration_withStartAt() throws NoSuchFieldException, IllegalAccessException { String source = RandomStringUtils.randomAlphabetic(10); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "source", source); + List sourceKeys = List.of(source); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "sourceKeys", sourceKeys); int startAt = random.nextInt(100); setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", startAt); - assertThat(truncateProcessorConfig.getSource(), equalTo(source)); + assertThat(truncateProcessorConfig.getSourceKeys(), equalTo(sourceKeys)); assertThat(truncateProcessorConfig.getStartAt(), equalTo(startAt)); assertTrue(truncateProcessorConfig.isValidConfig()); } @Test void testValidConfiguration_withLength() throws NoSuchFieldException, IllegalAccessException { - String source = RandomStringUtils.randomAlphabetic(10); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "source", source); + String source1 = RandomStringUtils.randomAlphabetic(10); + String source2 = RandomStringUtils.randomAlphabetic(10); + List sourceKeys = List.of(source1, source2); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "sourceKeys", sourceKeys); int length = random.nextInt(100); setField(TruncateProcessorConfig.class, truncateProcessorConfig, "length", length); - assertThat(truncateProcessorConfig.getSource(), equalTo(source)); + assertThat(truncateProcessorConfig.getSourceKeys(), equalTo(sourceKeys)); assertThat(truncateProcessorConfig.getLength(), equalTo(length)); assertTrue(truncateProcessorConfig.isValidConfig()); } @@ -63,13 +67,14 @@ void testValidConfiguration_withLength() throws NoSuchFieldException, IllegalAcc void testValidConfiguration_withLength_withTruncateWhen() throws NoSuchFieldException, IllegalAccessException { String source = RandomStringUtils.randomAlphabetic(10); String condition = RandomStringUtils.randomAlphabetic(10); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "source", source); + List sourceKeys = List.of(source); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "sourceKeys", sourceKeys); int length = random.nextInt(100); int startAt = random.nextInt(100); setField(TruncateProcessorConfig.class, truncateProcessorConfig, "length", length); setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", startAt); setField(TruncateProcessorConfig.class, truncateProcessorConfig, "truncateWhen", condition); - assertThat(truncateProcessorConfig.getSource(), equalTo(source)); + assertThat(truncateProcessorConfig.getSourceKeys(), equalTo(sourceKeys)); assertThat(truncateProcessorConfig.getLength(), equalTo(length)); assertThat(truncateProcessorConfig.getStartAt(), equalTo(startAt)); assertThat(truncateProcessorConfig.getTruncateWhen(), equalTo(condition)); @@ -79,7 +84,7 @@ void testValidConfiguration_withLength_withTruncateWhen() throws NoSuchFieldExce @Test void testInvalidConfiguration_StartAt_Length_BothNull() throws NoSuchFieldException, IllegalAccessException { String source = RandomStringUtils.randomAlphabetic(10); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "source", source); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "sourceKeys", List.of(source)); assertFalse(truncateProcessorConfig.isValidConfig()); } @@ -88,7 +93,7 @@ void testInvalidConfiguration_StartAt_Length_Negative() throws NoSuchFieldExcept String source = RandomStringUtils.randomAlphabetic(10); int length = random.nextInt(100); int startAt = random.nextInt(100); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "source", source); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "sourceKeys", List.of(source)); setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", -startAt); assertFalse(truncateProcessorConfig.isValidConfig()); setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", startAt); diff --git a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java index d3f2a22eca..856358bb69 100644 --- a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java +++ b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java @@ -53,7 +53,7 @@ private TruncateProcessor createObjectUnderTest() { @ArgumentsSource(TruncateArgumentsProvider.class) void testTruncateProcessor(final Object messageValue, final Integer startAt, final Integer truncateLength, final Object truncatedMessage) { - when(config.getSource()).thenReturn("message"); + when(config.getSourceKeys()).thenReturn(List.of("message")); when(config.getStartAt()).thenReturn(startAt); when(config.getLength()).thenReturn(truncateLength); when(config.getTruncateWhen()).thenReturn(null); @@ -70,7 +70,7 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() { final String truncateWhen = UUID.randomUUID().toString(); final String message = UUID.randomUUID().toString(); - when(config.getSource()).thenReturn("message"); + when(config.getSourceKeys()).thenReturn(List.of("message")); when(config.getStartAt()).thenReturn(null); when(config.getLength()).thenReturn(5); when(config.getTruncateWhen()).thenReturn(truncateWhen); diff --git a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorTests.java b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorTests.java deleted file mode 100644 index 3380e7baed..0000000000 --- a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateStringProcessorTests.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.processor.truncate; - -import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.record.Record; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import static org.junit.jupiter.params.provider.Arguments.arguments; -import org.junit.jupiter.params.provider.ArgumentsProvider; -import org.junit.jupiter.params.provider.ArgumentsSource; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Stream; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -class TruncateStringProcessorTests { - - @Mock - private PluginMetrics pluginMetrics; - - @Mock - private TruncateStringProcessorConfig config; - - @Mock - private ExpressionEvaluator expressionEvaluator; - - private TruncateStringProcessor createObjectUnderTest() { - return new TruncateStringProcessor(pluginMetrics, config, expressionEvaluator); - } - - @ParameterizedTest - @ArgumentsSource(TruncateStringArgumentsProvider.class) - void testTruncateStringProcessor(final Object messageValue, final Integer startAt, final Integer truncateLength, final Object truncatedMessage) { - - when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", startAt, truncateLength, null))); - - final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest(); - final Record record = createEvent(messageValue); - final List> truncatedRecords = (List>) truncateStringProcessor.doExecute(Collections.singletonList(record)); - assertThat(truncatedRecords.get(0).getData().get("message", Object.class), notNullValue()); - assertThat(truncatedRecords.get(0).getData().get("message", Object.class), equalTo(truncatedMessage)); - } - - @Test - public void testInputValidation() { - assertThat(createEntry("message", null, null, null).hasStartAtOrLength(), equalTo(false)); - assertThat(createEntry("message", null, -5, null).hasStartAtOrLength(), equalTo(false)); - assertThat(createEntry("message", -5, null, null).hasStartAtOrLength(), equalTo(false)); - assertThat(createEntry("message", -5, -6, null).hasStartAtOrLength(), equalTo(false)); - - } - - @Test - void test_event_is_the_same_when_truncateWhen_condition_returns_false() { - final String truncateWhen = UUID.randomUUID().toString(); - final String message = UUID.randomUUID().toString(); - - when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", null, 5, truncateWhen))); - - final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest(); - final Record record = createEvent(message); - when(expressionEvaluator.evaluateConditional(truncateWhen, record.getData())).thenReturn(false); - final List> truncatedRecords = (List>) truncateStringProcessor.doExecute(Collections.singletonList(record)); - - assertThat(truncatedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap())); - } - - - private TruncateStringProcessorConfig.Entry createEntry(final String source, final Integer startAt, final Integer length, final String truncateWhen) { - return new TruncateStringProcessorConfig.Entry(source, startAt, length, truncateWhen); - } - - private Record createEvent(final Object message) { - final Map eventData = new HashMap<>(); - eventData.put("message", message); - return new Record<>(JacksonEvent.builder() - .withEventType("event") - .withData(eventData) - .build()); - } - - static class TruncateStringArgumentsProvider implements ArgumentsProvider { - - @Override - public Stream provideArguments(ExtensionContext context) { - return Stream.of( - arguments("hello,world,no-truncate", 0, 100, "hello,world,no-truncate"), - arguments("hello,world,no-truncate", 6, 100, "world,no-truncate"), - arguments("hello,world,no-truncate", 6, 16, "world,no-truncat"), - arguments("hello,world,no-truncate", 6, 17, "world,no-truncate"), - arguments("hello,world,no-truncate", 6, 18, "world,no-truncate"), - arguments("hello,world,no-truncate", 6, 5, "world"), - arguments("hello,world,no-truncate", 6, null, "world,no-truncate"), - - arguments("hello,world,no-truncate", null, 100, "hello,world,no-truncate"), - arguments("hello,world,truncate", null, 11, "hello,world"), - arguments("hello,world", null, 1, "h"), - arguments("hello", null, 0, ""), - arguments(List.of("hello_one", "hello_two", "hello_three"), null, 5, List.of("hello", "hello", "hello")), - arguments(List.of("hello_one", 2, "hello_three"), null, 5, List.of("hello", 2, "hello")) - ); - } - } - -} - From 960902952868fadaf941bdbb89117eb1825a89ea Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 11 Jan 2024 04:54:02 +0000 Subject: [PATCH 10/10] Modified to allow multiple entries under configuration Signed-off-by: Krishna Kondaka --- .../truncate-processor/README.md | 55 +++++------- .../processor/truncate/TruncateProcessor.java | 50 +++++------ .../truncate/TruncateProcessorConfig.java | 83 ++++++++++++------- .../TruncateProcessorConfigTests.java | 74 ++++++++++------- .../truncate/TruncateProcessorTests.java | 50 ++++++++--- 5 files changed, 179 insertions(+), 133 deletions(-) diff --git a/data-prepper-plugins/truncate-processor/README.md b/data-prepper-plugins/truncate-processor/README.md index 76a34dc431..e4ea59115d 100644 --- a/data-prepper-plugins/truncate-processor/README.md +++ b/data-prepper-plugins/truncate-processor/README.md @@ -13,8 +13,14 @@ pipeline: format: "json" processor: - truncate: - source: "message" - length: 5 + entries: + - source_keys: ["message1", "message2"] + length: 5 + - source_keys: ["info"] + length: 6 + start_at: 4 + - source_keys: ["log"] + start_at: 5 sink: - stdout: ``` @@ -22,35 +28,14 @@ pipeline: Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. ```json -{"message": "hello,world"} +{"message1": "hello,world", "message2": "test message", "info", "new information", "log": "test log message"} ``` When you run Data Prepper with this `pipeline.yaml`, you should see the following output: ```json -{"message":["hello"]} -``` - -If the above yaml file has additional config of `start_at: 2`, then the output would be following: - -```json -{"message":["llo,w"]} -``` - -If the above yaml file has additional config of `start_at: 2`, and does not have `length: 5` in the config, then the output would be following: - -```json -{"message":["llo,world"]} -``` - -If the source has an list of strings, then the result will be an array of strings where each of the member of the list is truncated. The following input -```json -{"message": ["hello_one", "hello_two", "hello_three"]} -``` -is transformed to the following: - -```json -{"message": ["hello", "hello", "hello"]} +{"message1":"hello", "message2":"test ", "info":"inform", "log": "log message"} ``` +where `message1` and `message2` have input values truncated to length 5, starting from index 0, `info` input value truncated to length 6 starting from index 4 and `log` input value truncated at the front by 5 characters. Example configuration with `truncate_when` option: ```yaml @@ -62,10 +47,11 @@ pipeline: format: "json" processor: - truncate: - source: "message" - length: 5 - start_at: 7 - truncate_when: '/id == 1' + entries: + - source: ["message"] + length: 5 + start_at: 8 + truncate_when: '/id == 1' sink: - stdout: ``` @@ -82,8 +68,9 @@ the output would be ``` ### Configuration -* `source` - (required) - The key to be modified -* `truncate_when` - (optional) - a condition, when it is true the truncate operation is performed. -* `start_at` - (optional) - starting index of the string. Defaults to 0. -* `length` - (optional) - length of the string after truncation. Defaults to end of the string. +* `entries` - (required) - A list of entries to add to an event + * `source_keys` - (required) - The list of key to be modified + * `truncate_when` - (optional) - a condition, when it is true the truncate operation is performed. + * `start_at` - (optional) - starting index of the string. Defaults to 0. + * `length` - (optional) - length of the string after truncation. Defaults to end of the string. Either `start_at` or `length` or both must be present diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java index 00bcd04e89..8449675791 100644 --- a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java @@ -26,22 +26,16 @@ @DataPrepperPlugin(name = "truncate", pluginType = Processor.class, pluginConfigurationType = TruncateProcessorConfig.class) public class TruncateProcessor extends AbstractProcessor, Record>{ private final ExpressionEvaluator expressionEvaluator; - private final String truncateWhen; - private final int startIndex; - private final Integer length; - private final List sourceKeys; + private final List entries; @DataPrepperPluginConstructor public TruncateProcessor(final PluginMetrics pluginMetrics, final TruncateProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); this.expressionEvaluator = expressionEvaluator; - this.truncateWhen = config.getTruncateWhen(); - this.sourceKeys = config.getSourceKeys(); - this.startIndex = config.getStartAt() == null ? 0 : config.getStartAt(); - this.length = config.getLength(); + this.entries = config.getEntries(); } - private String getTruncatedValue(final String value) { + private String getTruncatedValue(final String value, final int startIndex, final Integer length) { String truncatedValue = (length == null || startIndex+length >= value.length()) ? value.substring(startIndex) : @@ -54,27 +48,33 @@ private String getTruncatedValue(final String value) { public Collection> doExecute(final Collection> records) { for(final Record record : records) { final Event recordEvent = record.getData(); - if (truncateWhen != null && !expressionEvaluator.evaluateConditional(truncateWhen, recordEvent)) { - continue; - } - for (String sourceKey: sourceKeys) { - if (!recordEvent.containsKey(sourceKey)) { + for (TruncateProcessorConfig.Entry entry: entries) { + final List sourceKeys = entry.getSourceKeys(); + final String truncateWhen = entry.getTruncateWhen(); + final int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt(); + final Integer length = entry.getLength(); + if (truncateWhen != null && !expressionEvaluator.evaluateConditional(truncateWhen, recordEvent)) { continue; } + for (String sourceKey: sourceKeys) { + if (!recordEvent.containsKey(sourceKey)) { + continue; + } - final Object value = recordEvent.get(sourceKey, Object.class); - if (value instanceof String) { - recordEvent.put(sourceKey, getTruncatedValue((String)value)); - } else if (value instanceof List) { - List result = new ArrayList<>(); - for (Object listItem: (List)value) { - if (listItem instanceof String) { - result.add(getTruncatedValue((String)listItem)); - } else { - result.add(listItem); + final Object value = recordEvent.get(sourceKey, Object.class); + if (value instanceof String) { + recordEvent.put(sourceKey, getTruncatedValue((String)value, startIndex, length)); + } else if (value instanceof List) { + List result = new ArrayList<>(); + for (Object listItem: (List)value) { + if (listItem instanceof String) { + result.add(getTruncatedValue((String)listItem, startIndex, length)); + } else { + result.add(listItem); + } } + recordEvent.put(sourceKey, result); } - recordEvent.put(sourceKey, result); } } } diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java index 943cd4c1ba..1172da77b6 100644 --- a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java @@ -9,50 +9,73 @@ import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.Valid; import java.util.List; public class TruncateProcessorConfig { - @NotEmpty - @NotNull - @JsonProperty("source_keys") - private List sourceKeys; + public static class Entry { + @NotEmpty + @NotNull + @JsonProperty("source_keys") + private List sourceKeys; - @JsonProperty("length") - private Integer length; + @JsonProperty("start_at") + private Integer startAt; - @JsonProperty("start_at") - private Integer startAt; + @JsonProperty("length") + private Integer length; - @JsonProperty("truncate_when") - private String truncateWhen; + @JsonProperty("truncate_when") + private String truncateWhen; - public List getSourceKeys() { - return sourceKeys; - } + public Entry(final List sourceKeys, final Integer startAt, final Integer length, final String truncateWhen) { + this.sourceKeys = sourceKeys; + this.startAt = startAt; + this.length = length; + this.truncateWhen = truncateWhen; + } - public Integer getStartAt() { - return startAt; - } + public Entry() {} - public Integer getLength() { - return length; - } + public List getSourceKeys() { + return sourceKeys; + } - @AssertTrue(message = "At least one of source or source_keys must be specified. At least one of start_at or length or both must be specified and the values must be positive integers") - public boolean isValidConfig() { - if (length == null && startAt == null) { - return false; + public Integer getStartAt() { + return startAt; } - if (length != null && length < 0) { - return false; + + public Integer getLength() { + return length; } - if (startAt != null && startAt < 0) { - return false; + + public String getTruncateWhen() { + return truncateWhen; } - return true; - } - public String getTruncateWhen() { return truncateWhen; } + @AssertTrue(message = "source_keys must be specified and at least one of start_at or length or both must be specified and the values must be positive integers") + public boolean isValidConfig() { + if (length == null && startAt == null) { + return false; + } + if (length != null && length < 0) { + return false; + } + if (startAt != null && startAt < 0) { + return false; + } + return true; + } + } + + @NotEmpty + @NotNull + private List<@Valid Entry> entries; + + public List getEntries() { + return entries; + } + } diff --git a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java index e910f618ee..3032db6b8b 100644 --- a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java +++ b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfigTests.java @@ -32,72 +32,84 @@ void setUp() { @Test void testDefaults() { - assertThat(truncateProcessorConfig.getSourceKeys(), equalTo(null)); - assertThat(truncateProcessorConfig.getStartAt(), equalTo(null)); - assertThat(truncateProcessorConfig.getLength(), equalTo(null)); - assertThat(truncateProcessorConfig.getTruncateWhen(), equalTo(null)); + assertThat(truncateProcessorConfig.getEntries(), equalTo(null)); + } + + @Test + void testEntryDefaults() { + TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); + assertThat(entry.getStartAt(), equalTo(null)); + assertThat(entry.getLength(), equalTo(null)); + assertThat(entry.getTruncateWhen(), equalTo(null)); } @Test void testValidConfiguration_withStartAt() throws NoSuchFieldException, IllegalAccessException { + TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); String source = RandomStringUtils.randomAlphabetic(10); List sourceKeys = List.of(source); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "sourceKeys", sourceKeys); + setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", sourceKeys); int startAt = random.nextInt(100); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", startAt); - assertThat(truncateProcessorConfig.getSourceKeys(), equalTo(sourceKeys)); - assertThat(truncateProcessorConfig.getStartAt(), equalTo(startAt)); - assertTrue(truncateProcessorConfig.isValidConfig()); + setField(TruncateProcessorConfig.Entry.class, entry, "startAt", startAt); + assertThat(entry.getSourceKeys(), equalTo(sourceKeys)); + assertThat(entry.getStartAt(), equalTo(startAt)); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "entries", List.of(entry)); + assertTrue(entry.isValidConfig()); } @Test void testValidConfiguration_withLength() throws NoSuchFieldException, IllegalAccessException { + TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); String source1 = RandomStringUtils.randomAlphabetic(10); String source2 = RandomStringUtils.randomAlphabetic(10); List sourceKeys = List.of(source1, source2); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "sourceKeys", sourceKeys); + setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", sourceKeys); int length = random.nextInt(100); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "length", length); - assertThat(truncateProcessorConfig.getSourceKeys(), equalTo(sourceKeys)); - assertThat(truncateProcessorConfig.getLength(), equalTo(length)); - assertTrue(truncateProcessorConfig.isValidConfig()); + setField(TruncateProcessorConfig.Entry.class, entry, "length", length); + assertThat(entry.getSourceKeys(), equalTo(sourceKeys)); + assertThat(entry.getLength(), equalTo(length)); + setField(TruncateProcessorConfig.class, truncateProcessorConfig, "entries", List.of(entry)); + assertTrue(entry.isValidConfig()); } @Test void testValidConfiguration_withLength_withTruncateWhen() throws NoSuchFieldException, IllegalAccessException { + TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); String source = RandomStringUtils.randomAlphabetic(10); String condition = RandomStringUtils.randomAlphabetic(10); List sourceKeys = List.of(source); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "sourceKeys", sourceKeys); + setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", sourceKeys); int length = random.nextInt(100); int startAt = random.nextInt(100); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "length", length); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", startAt); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "truncateWhen", condition); - assertThat(truncateProcessorConfig.getSourceKeys(), equalTo(sourceKeys)); - assertThat(truncateProcessorConfig.getLength(), equalTo(length)); - assertThat(truncateProcessorConfig.getStartAt(), equalTo(startAt)); - assertThat(truncateProcessorConfig.getTruncateWhen(), equalTo(condition)); - assertTrue(truncateProcessorConfig.isValidConfig()); + setField(TruncateProcessorConfig.Entry.class, entry, "length", length); + setField(TruncateProcessorConfig.Entry.class, entry, "startAt", startAt); + setField(TruncateProcessorConfig.Entry.class, entry, "truncateWhen", condition); + assertThat(entry.getSourceKeys(), equalTo(sourceKeys)); + assertThat(entry.getLength(), equalTo(length)); + assertThat(entry.getStartAt(), equalTo(startAt)); + assertThat(entry.getTruncateWhen(), equalTo(condition)); + assertTrue(entry.isValidConfig()); } @Test void testInvalidConfiguration_StartAt_Length_BothNull() throws NoSuchFieldException, IllegalAccessException { + TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); String source = RandomStringUtils.randomAlphabetic(10); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "sourceKeys", List.of(source)); - assertFalse(truncateProcessorConfig.isValidConfig()); + setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", List.of(source)); + assertFalse(entry.isValidConfig()); } @Test void testInvalidConfiguration_StartAt_Length_Negative() throws NoSuchFieldException, IllegalAccessException { + TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry(); String source = RandomStringUtils.randomAlphabetic(10); int length = random.nextInt(100); int startAt = random.nextInt(100); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "sourceKeys", List.of(source)); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", -startAt); - assertFalse(truncateProcessorConfig.isValidConfig()); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "startAt", startAt); - setField(TruncateProcessorConfig.class, truncateProcessorConfig, "length", -length); - assertFalse(truncateProcessorConfig.isValidConfig()); + setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", List.of(source)); + setField(TruncateProcessorConfig.Entry.class, entry, "startAt", -startAt); + assertFalse(entry.isValidConfig()); + setField(TruncateProcessorConfig.Entry.class, entry, "startAt", startAt); + setField(TruncateProcessorConfig.Entry.class, entry, "length", -length); + assertFalse(entry.isValidConfig()); } } diff --git a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java index 856358bb69..7717181864 100644 --- a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java +++ b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java @@ -53,40 +53,52 @@ private TruncateProcessor createObjectUnderTest() { @ArgumentsSource(TruncateArgumentsProvider.class) void testTruncateProcessor(final Object messageValue, final Integer startAt, final Integer truncateLength, final Object truncatedMessage) { - when(config.getSourceKeys()).thenReturn(List.of("message")); - when(config.getStartAt()).thenReturn(startAt); - when(config.getLength()).thenReturn(truncateLength); - when(config.getTruncateWhen()).thenReturn(null); - + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), startAt, truncateLength, null))); final TruncateProcessor truncateProcessor = createObjectUnderTest(); - final Record record = createEvent(messageValue); + final Record record = createEvent("message", messageValue); final List> truncatedRecords = (List>) truncateProcessor.doExecute(Collections.singletonList(record)); assertThat(truncatedRecords.get(0).getData().get("message", Object.class), notNullValue()); assertThat(truncatedRecords.get(0).getData().get("message", Object.class), equalTo(truncatedMessage)); } + @ParameterizedTest + @ArgumentsSource(MultipleTruncateArgumentsProvider.class) + void testTruncateProcessorMultipleEntries(final Object messageValue, final Integer startAt1, final Integer truncateLength1, final Integer startAt2, final Integer truncateLength2, final Object truncatedMessage1, final Object truncatedMessage2) { + TruncateProcessorConfig.Entry entry1 = createEntry(List.of("message1"), startAt1, truncateLength1, null); + TruncateProcessorConfig.Entry entry2 = createEntry(List.of("message2"), startAt2, truncateLength2, null); + when(config.getEntries()).thenReturn(List.of(entry1, entry2)); + final Record record1 = createEvent("message1", messageValue); + final Record record2 = createEvent("message2", messageValue); + final TruncateProcessor truncateProcessor = createObjectUnderTest(); + final List> truncatedRecords = (List>) truncateProcessor.doExecute(List.of(record1, record2)); + assertThat(truncatedRecords.get(0).getData().get("message1", Object.class), notNullValue()); + assertThat(truncatedRecords.get(1).getData().get("message2", Object.class), notNullValue()); + assertThat(truncatedRecords.get(0).getData().get("message1", Object.class), equalTo(truncatedMessage1)); + assertThat(truncatedRecords.get(1).getData().get("message2", Object.class), equalTo(truncatedMessage2)); + } + @Test void test_event_is_the_same_when_truncateWhen_condition_returns_false() { final String truncateWhen = UUID.randomUUID().toString(); final String message = UUID.randomUUID().toString(); - when(config.getSourceKeys()).thenReturn(List.of("message")); - when(config.getStartAt()).thenReturn(null); - when(config.getLength()).thenReturn(5); - when(config.getTruncateWhen()).thenReturn(truncateWhen); + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), null, 5, truncateWhen))); final TruncateProcessor truncateProcessor = createObjectUnderTest(); - final Record record = createEvent(message); + final Record record = createEvent("message", message); when(expressionEvaluator.evaluateConditional(truncateWhen, record.getData())).thenReturn(false); final List> truncatedRecords = (List>) truncateProcessor.doExecute(Collections.singletonList(record)); assertThat(truncatedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap())); } + private TruncateProcessorConfig.Entry createEntry(final List sourceKeys, final Integer startAt, final Integer length, final String truncateWhen) { + return new TruncateProcessorConfig.Entry(sourceKeys, startAt, length, truncateWhen); + } - private Record createEvent(final Object message) { + private Record createEvent(final String key, final Object value) { final Map eventData = new HashMap<>(); - eventData.put("message", message); + eventData.put(key, value); return new Record<>(JacksonEvent.builder() .withEventType("event") .withData(eventData) @@ -116,5 +128,17 @@ public Stream provideArguments(ExtensionContext context) { } } + static class MultipleTruncateArgumentsProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + return Stream.of( + arguments("hello,world,no-truncate", 0, 100, 1, 10, "hello,world,no-truncate", "ello,world"), + arguments("hello,world,no-truncate", 6, 100, null, 5, "world,no-truncate", "hello"), + arguments("hello,world,no-truncate", 6, 16, 12, null, "world,no-truncat", "no-truncate") + ); + } + } + }