Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add truncate string processor #3924

Merged
merged 10 commits into from
Jan 11, 2024
Merged
76 changes: 76 additions & 0 deletions data-prepper-plugins/truncate-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Truncate Processor

This is a processor that truncates key's value at the beginning or at the end or at both sides of a string as per the configuration. If the key's value is a list, then each of the string members of the list are truncated. Non-string members of the list are left untouched. If `truncate_when` option is provided, the truncation of the input is done only when the condition specified is true for the event being processed.

## Basic Usage
To get started, create the following `pipeline.yaml`.
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- truncate:
entries:
- source_keys: ["message1", "message2"]
length: 5
- source_keys: ["info"]
length: 6
start_at: 4
- source_keys: ["log"]
start_at: 5
sink:
- stdout:
```

Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.

```json
{"message1": "hello,world", "message2": "test message", "info", "new information", "log": "test log message"}
```
When you run Data Prepper with this `pipeline.yaml`, you should see the following output:

```json
{"message1":"hello", "message2":"test ", "info":"inform", "log": "log message"}
```
where `message1` and `message2` have input values truncated to length 5, starting from index 0, `info` input value truncated to length 6 starting from index 4 and `log` input value truncated at the front by 5 characters.

Example configuration with `truncate_when` option:
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- truncate:
entries:
- source: ["message"]
length: 5
start_at: 8
truncate_when: '/id == 1'
sink:
- stdout:
```

When the pipeline started with the above configuration receives the following two events
```json
{"message": "hello, world", "id": 1}
{"message": "hello, world,not-truncated", "id": 2}
```
the output would be
```json
{"message": "world", "id": 1}
{"message": "hello, world,not-truncated", "id": 2}
```

### Configuration
* `entries` - (required) - A list of entries to add to an event
* `source_keys` - (required) - The list of key to be modified
* `truncate_when` - (optional) - a condition, when it is true the truncate operation is performed.
* `start_at` - (optional) - starting index of the string. Defaults to 0.
* `length` - (optional) - length of the string after truncation. Defaults to end of the string.
Either `start_at` or `length` or both must be present
17 changes: 17 additions & 0 deletions data-prepper-plugins/truncate-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-test-common')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation libs.commons.lang3
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.truncate;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;

import java.util.Collection;
import java.util.ArrayList;
import java.util.List;

/**
* This processor takes in a key and truncates its value to a string with
* characters from the front or at the end or at both removed.
* If the value is not a string, no action is performed.
*/
@DataPrepperPlugin(name = "truncate", pluginType = Processor.class, pluginConfigurationType = TruncateProcessorConfig.class)
public class TruncateProcessor extends AbstractProcessor<Record<Event>, Record<Event>>{
private final ExpressionEvaluator expressionEvaluator;
private final List<TruncateProcessorConfig.Entry> entries;

@DataPrepperPluginConstructor
public TruncateProcessor(final PluginMetrics pluginMetrics, final TruncateProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.expressionEvaluator = expressionEvaluator;
this.entries = config.getEntries();
}

private String getTruncatedValue(final String value, final int startIndex, final Integer length) {
String truncatedValue =
(length == null || startIndex+length >= value.length()) ?
value.substring(startIndex) :
value.substring(startIndex, startIndex + length);

return truncatedValue;
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();
for (TruncateProcessorConfig.Entry entry: entries) {
final List<String> sourceKeys = entry.getSourceKeys();
final String truncateWhen = entry.getTruncateWhen();
final int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt();
final Integer length = entry.getLength();
if (truncateWhen != null && !expressionEvaluator.evaluateConditional(truncateWhen, recordEvent)) {
continue;
}
for (String sourceKey: sourceKeys) {
if (!recordEvent.containsKey(sourceKey)) {
continue;
}

final Object value = recordEvent.get(sourceKey, Object.class);
if (value instanceof String) {
recordEvent.put(sourceKey, getTruncatedValue((String)value, startIndex, length));
} else if (value instanceof List) {
List<Object> result = new ArrayList<>();
for (Object listItem: (List)value) {
if (listItem instanceof String) {
result.add(getTruncatedValue((String)listItem, startIndex, length));
} else {
result.add(listItem);
}
}
recordEvent.put(sourceKey, result);
}
}
}
}

return records;
}

@Override
public void prepareForShutdown() {

}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.truncate;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.Valid;

import java.util.List;

public class TruncateProcessorConfig {
public static class Entry {
@NotEmpty
@NotNull
@JsonProperty("source_keys")
private List<String> sourceKeys;

@JsonProperty("start_at")
private Integer startAt;

@JsonProperty("length")
private Integer length;

@JsonProperty("truncate_when")
private String truncateWhen;

public Entry(final List<String> sourceKeys, final Integer startAt, final Integer length, final String truncateWhen) {
this.sourceKeys = sourceKeys;
this.startAt = startAt;
this.length = length;
this.truncateWhen = truncateWhen;
}

public Entry() {}

public List<String> getSourceKeys() {
return sourceKeys;
}

public Integer getStartAt() {
return startAt;
}

public Integer getLength() {
return length;
}

public String getTruncateWhen() {
return truncateWhen;
}

@AssertTrue(message = "source_keys must be specified and at least one of start_at or length or both must be specified and the values must be positive integers")
public boolean isValidConfig() {
if (length == null && startAt == null) {
return false;
}
if (length != null && length < 0) {
return false;
}
if (startAt != null && startAt < 0) {
return false;
}
return true;
}
}

@NotEmpty
@NotNull
private List<@Valid Entry> entries;

public List<Entry> getEntries() {
return entries;
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.truncate;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;

import org.apache.commons.lang3.RandomStringUtils;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

import java.util.List;
import java.util.Random;

class TruncateProcessorConfigTests {
TruncateProcessorConfig truncateProcessorConfig;

Random random;

@BeforeEach
void setUp() {
truncateProcessorConfig = new TruncateProcessorConfig();
random = new Random();
}

@Test
void testDefaults() {
assertThat(truncateProcessorConfig.getEntries(), equalTo(null));
}

@Test
void testEntryDefaults() {
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry();
assertThat(entry.getStartAt(), equalTo(null));
assertThat(entry.getLength(), equalTo(null));
assertThat(entry.getTruncateWhen(), equalTo(null));
}

@Test
void testValidConfiguration_withStartAt() throws NoSuchFieldException, IllegalAccessException {
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry();
String source = RandomStringUtils.randomAlphabetic(10);
List<String> sourceKeys = List.of(source);
setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", sourceKeys);
int startAt = random.nextInt(100);
setField(TruncateProcessorConfig.Entry.class, entry, "startAt", startAt);
assertThat(entry.getSourceKeys(), equalTo(sourceKeys));
assertThat(entry.getStartAt(), equalTo(startAt));
setField(TruncateProcessorConfig.class, truncateProcessorConfig, "entries", List.of(entry));
assertTrue(entry.isValidConfig());
}

@Test
void testValidConfiguration_withLength() throws NoSuchFieldException, IllegalAccessException {
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry();
String source1 = RandomStringUtils.randomAlphabetic(10);
String source2 = RandomStringUtils.randomAlphabetic(10);
List<String> sourceKeys = List.of(source1, source2);
setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", sourceKeys);
int length = random.nextInt(100);
setField(TruncateProcessorConfig.Entry.class, entry, "length", length);
assertThat(entry.getSourceKeys(), equalTo(sourceKeys));
assertThat(entry.getLength(), equalTo(length));
setField(TruncateProcessorConfig.class, truncateProcessorConfig, "entries", List.of(entry));
assertTrue(entry.isValidConfig());
}

@Test
void testValidConfiguration_withLength_withTruncateWhen() throws NoSuchFieldException, IllegalAccessException {
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry();
String source = RandomStringUtils.randomAlphabetic(10);
String condition = RandomStringUtils.randomAlphabetic(10);
List<String> sourceKeys = List.of(source);
setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", sourceKeys);
int length = random.nextInt(100);
int startAt = random.nextInt(100);
setField(TruncateProcessorConfig.Entry.class, entry, "length", length);
setField(TruncateProcessorConfig.Entry.class, entry, "startAt", startAt);
setField(TruncateProcessorConfig.Entry.class, entry, "truncateWhen", condition);
assertThat(entry.getSourceKeys(), equalTo(sourceKeys));
assertThat(entry.getLength(), equalTo(length));
assertThat(entry.getStartAt(), equalTo(startAt));
assertThat(entry.getTruncateWhen(), equalTo(condition));
assertTrue(entry.isValidConfig());
}

@Test
void testInvalidConfiguration_StartAt_Length_BothNull() throws NoSuchFieldException, IllegalAccessException {
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry();
String source = RandomStringUtils.randomAlphabetic(10);
setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", List.of(source));
assertFalse(entry.isValidConfig());
}

@Test
void testInvalidConfiguration_StartAt_Length_Negative() throws NoSuchFieldException, IllegalAccessException {
TruncateProcessorConfig.Entry entry = new TruncateProcessorConfig.Entry();
String source = RandomStringUtils.randomAlphabetic(10);
int length = random.nextInt(100);
int startAt = random.nextInt(100);
setField(TruncateProcessorConfig.Entry.class, entry, "sourceKeys", List.of(source));
setField(TruncateProcessorConfig.Entry.class, entry, "startAt", -startAt);
assertFalse(entry.isValidConfig());
setField(TruncateProcessorConfig.Entry.class, entry, "startAt", startAt);
setField(TruncateProcessorConfig.Entry.class, entry, "length", -length);
assertFalse(entry.isValidConfig());
}
}
Loading
Loading