Skip to content

Commit

Permalink
Add support for epoch timestamps and configurable output format (#3860)
Browse files Browse the repository at this point in the history
* Add support for epoch timestamps and configurable output format

Signed-off-by: Krishna Kondaka <[email protected]>

* Add support for epoch timestamps and configurable output format

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Dec 20, 2023
1 parent df2bde6 commit f19de03
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 8 deletions.
3 changes: 3 additions & 0 deletions data-prepper-plugins/date-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ valid key and at least one pattern is required if match is configured.
* `patterns`: List of possible patterns the timestamp value of key can have. The patterns are based on sequence of letters and symbols.
The `patterns` support all the patterns listed in Java
[DatetimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
and also supports `epoch_second`, `epoch_milli` and `epoch_nano` values which represents the timestamp as the number of seconds, milliseconds and nano seconds since epoch. Epoch values are always UTC time zone.
* Type: `List<String>`

The following example of date configuration will use `timestamp` key to match against given patterns and stores the timestamp in ISO 8601
Expand Down Expand Up @@ -106,6 +107,8 @@ processor:

* `to_origination_metadata` (Optional): When this option is used, matched time is put into the event's metadata as an instance of `Instant`.

* `output_format` (Optional): indicates the format of the `@timestamp`. Default is `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`.

## Metrics

* `dateProcessingMatchSuccessCounter`: Number of records that match with at least one pattern specified in match configuration option.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -33,12 +35,16 @@
public class DateProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(DateProcessor.class);
private static final String OUTPUT_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
private static final int LENGTH_OF_EPOCH_IN_MILLIS = 13;
private static final int LENGTH_OF_EPOCH_SECONDS = 10;

static final String DATE_PROCESSING_MATCH_SUCCESS = "dateProcessingMatchSuccess";
static final String DATE_PROCESSING_MATCH_FAILURE = "dateProcessingMatchFailure";

private String keyToParse;
private List<DateTimeFormatter> dateTimeFormatters;
private Set<String> epochFormatters;
private String outputFormat;
private final DateProcessorConfig dateProcessorConfig;
private final ExpressionEvaluator expressionEvaluator;

Expand All @@ -50,6 +56,7 @@ public DateProcessor(PluginMetrics pluginMetrics, final DateProcessorConfig date
super(pluginMetrics);
this.dateProcessorConfig = dateProcessorConfig;
this.expressionEvaluator = expressionEvaluator;
this.outputFormat = dateProcessorConfig.getOutputFormat();

dateProcessingMatchSuccessCounter = pluginMetrics.counter(DATE_PROCESSING_MATCH_SUCCESS);
dateProcessingMatchFailureCounter = pluginMetrics.counter(DATE_PROCESSING_MATCH_FAILURE);
Expand All @@ -68,10 +75,10 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

String zonedDateTime = null;

if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived()))
if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived())) {
zonedDateTime = getDateTimeFromTimeReceived(record);

else if (keyToParse != null && !keyToParse.isEmpty()) {
} else if (keyToParse != null && !keyToParse.isEmpty()) {
Pair<String, Instant> result = getDateTimeFromMatch(record);
if (result != null) {
zonedDateTime = result.getLeft();
Expand All @@ -85,8 +92,9 @@ else if (keyToParse != null && !keyToParse.isEmpty()) {
populateDateProcessorMetrics(zonedDateTime);
}

if (zonedDateTime != null)
if (zonedDateTime != null) {
record.getData().put(dateProcessorConfig.getDestination(), zonedDateTime);
}
}
return records;
}
Expand All @@ -101,7 +109,8 @@ private void populateDateProcessorMetrics(final String zonedDateTime) {
private void extractKeyAndFormatters() {
for (DateProcessorConfig.DateMatch entry: dateProcessorConfig.getMatch()) {
keyToParse = entry.getKey();
dateTimeFormatters = entry.getPatterns().stream().map(this::getSourceFormatter).collect(Collectors.toList());
epochFormatters = entry.getPatterns().stream().filter(pattern -> pattern.contains("epoch")).collect(Collectors.toSet());
dateTimeFormatters = entry.getPatterns().stream().filter(pattern -> !pattern.contains("epoch")).map(this::getSourceFormatter).collect(Collectors.toList());
}
}

Expand Down Expand Up @@ -146,11 +155,71 @@ private String getSourceTimestamp(final Record<Event> record) {
}
}

private Pair<String, Instant> getEpochFormatOutput(Instant time) {
if (outputFormat.equals("epoch_second")) {
return Pair.of(Long.toString(time.getEpochSecond()), time);
} else if (outputFormat.equals("epoch_milli")) {
return Pair.of(Long.toString(time.toEpochMilli()), time);
} else { // epoch_nano. validation for valid epoch_ should be
// done at init time
long nano = (long)time.getEpochSecond() * 1000_000_000 + (long) time.getNano();
return Pair.of(Long.toString(nano), time);
}
}

private Pair<String, Instant> getFormattedDateTimeString(final String sourceTimestamp) {
ZoneId srcZoneId = dateProcessorConfig.getSourceZoneId();
ZoneId dstZoneId = dateProcessorConfig.getDestinationZoneId();
Long numberValue = null;
Instant epochTime;

if (epochFormatters.size() > 0) {
try {
numberValue = Long.parseLong(sourceTimestamp);
} catch (NumberFormatException e) {
numberValue = null;
}
}
if (numberValue != null) {
int timestampLength = sourceTimestamp.length();
if (timestampLength > LENGTH_OF_EPOCH_IN_MILLIS) {
if (epochFormatters.contains("epoch_nano")) {
epochTime = Instant.ofEpochSecond(numberValue/1000_000_000, numberValue % 1000_000_000);
} else {
LOG.warn("Source time value is larger than epoch pattern configured. epoch_nano is expected but not present in the patterns list");
return null;
}
} else if (timestampLength > LENGTH_OF_EPOCH_SECONDS) {
if (epochFormatters.contains("epoch_milli")) {
epochTime = Instant.ofEpochMilli(numberValue);
} else {
LOG.warn("Source time value is larger than epoch pattern configured. epoch_milli is expected but not present in the patterns list");
return null;
}
} else {
epochTime = Instant.ofEpochSecond(numberValue);
}
// Epochs are always UTC zone
srcZoneId = ZoneId.of("UTC");
try {
if (outputFormat.startsWith("epoch_")) {
return getEpochFormatOutput(epochTime);
} else {
DateTimeFormatter outputFormatter = getOutputFormatter().withZone(dstZoneId);
ZonedDateTime tmp = ZonedDateTime.ofInstant(epochTime, srcZoneId);
return Pair.of(tmp.format(outputFormatter.withZone(dstZoneId)), tmp.toInstant());
}
} catch (Exception ignored) {
}
}

for (DateTimeFormatter formatter : dateTimeFormatters) {
try {
ZonedDateTime tmp = ZonedDateTime.parse(sourceTimestamp, formatter);
return Pair.of(tmp.format(getOutputFormatter().withZone(dateProcessorConfig.getDestinationZoneId())), tmp.toInstant());
if (outputFormat.startsWith("epoch_")) {
return getEpochFormatOutput(tmp.toInstant());
}
return Pair.of(tmp.format(getOutputFormatter().withZone(dstZoneId)), tmp.toInstant());
} catch (Exception ignored) {
}
}
Expand All @@ -160,7 +229,7 @@ private Pair<String, Instant> getFormattedDateTimeString(final String sourceTime
}

private DateTimeFormatter getOutputFormatter() {
return DateTimeFormatter.ofPattern(OUTPUT_FORMAT);
return DateTimeFormatter.ofPattern(outputFormat);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import java.time.ZoneId;
import java.util.List;
import java.util.Locale;
import java.time.format.DateTimeFormatter;

public class DateProcessorConfig {
static final Boolean DEFAULT_FROM_TIME_RECEIVED = false;
static final Boolean DEFAULT_TO_ORIGINATION_METADATA = false;
static final String DEFAULT_DESTINATION = "@timestamp";
static final String DEFAULT_OUTPUT_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
static final String DEFAULT_SOURCE_TIMEZONE = ZoneId.systemDefault().toString();
static final String DEFAULT_DESTINATION_TIMEZONE = ZoneId.systemDefault().toString();

Expand All @@ -41,6 +43,41 @@ public String getKey() {
public List<String> getPatterns() {
return patterns;
}

@JsonIgnore
public boolean isValidPatterns() {
// For now, allow only one of the three "epoch_" pattern
int count = 0;
for (final String pattern: patterns) {
if (pattern.startsWith("epoch_")) {
count++;
}
if (count > 1) {
return false;
}
}
for (final String pattern: patterns) {
if (!isValidPattern(pattern)) {
return false;
}
}
return true;
}

public static boolean isValidPattern(final String pattern) {
if (pattern.equals("epoch_second") ||
pattern.equals("epoch_milli") ||
pattern.equals("epoch_nano")) {
return true;
}
try {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
return true;
} catch (Exception e) {
return false;
}
}

}

@JsonProperty("from_time_received")
Expand All @@ -55,6 +92,9 @@ public List<String> getPatterns() {
@JsonProperty("destination")
private String destination = DEFAULT_DESTINATION;

@JsonProperty("output_format")
private String outputFormat = DEFAULT_OUTPUT_FORMAT;

@JsonProperty("source_timezone")
private String sourceTimezone = DEFAULT_SOURCE_TIMEZONE;

Expand All @@ -76,6 +116,10 @@ public List<String> getPatterns() {
@JsonIgnore
private Locale sourceLocale;

public String getOutputFormat() {
return outputFormat;
}

public Boolean getFromTimeReceived() {
return fromTimeReceived;
}
Expand Down Expand Up @@ -160,15 +204,20 @@ boolean isValidMatch() {
if (match.size() != 1)
return false;

return match.get(0).getPatterns() != null && !match.get(0).getPatterns().isEmpty();
return match.get(0).getPatterns() != null && !match.get(0).getPatterns().isEmpty() && match.get(0).isValidPatterns();
}
return true;
}

@AssertTrue(message = "Invalid output format.")
boolean isValidOutputFormat() {
return DateMatch.isValidPattern(outputFormat);
}

@AssertTrue(message = "Invalid source_timezone provided.")
boolean isSourceTimezoneValid() {
try {
sourceZoneId = buildZoneId(sourceTimezone);
sourceZoneId = buildZoneId(sourceTimezone);
return true;
} catch (Exception e) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -42,6 +43,7 @@ class Validation {
void setUp() {
random = UUID.randomUUID().toString();
mockDateMatch = mock(DateProcessorConfig.DateMatch.class);
when(mockDateMatch.isValidPatterns()).thenReturn(true);
}

@Test
Expand All @@ -67,6 +69,23 @@ void isValidMatchAndFromTimestampReceived_should_return_false_if_from_time_recei
assertThat(dateProcessorConfig.isValidMatchAndFromTimestampReceived(), equalTo(false));
}

@Test
void testValidAndInvalidOutputFormats() throws NoSuchFieldException, IllegalAccessException {
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", random);
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false));

setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_second");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_milli");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_nano");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_xyz");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnXXX");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
}

@Test
void isValidMatch_should_return_true_if_match_has_single_entry() throws NoSuchFieldException, IllegalAccessException {
when(mockDateMatch.getPatterns()).thenReturn(Collections.singletonList(random));
Expand All @@ -77,6 +96,16 @@ void isValidMatch_should_return_true_if_match_has_single_entry() throws NoSuchFi
assertThat(dateProcessorConfig.isValidMatch(), equalTo(true));
}

@Test
void isValidMatch_should_return_false_if_match_has_multiple_epoch_patterns() throws NoSuchFieldException, IllegalAccessException {
when(mockDateMatch.getPatterns()).thenReturn(List.of("epoch_second", "epoch_milli"));

List<DateProcessorConfig.DateMatch> dateMatches = Arrays.asList(mockDateMatch, mockDateMatch);
reflectivelySetField(dateProcessorConfig, "match", dateMatches);

assertThat(dateProcessorConfig.isValidMatch(), equalTo(false));
}

@Test
void isValidMatch_should_return_false_if_match_has_multiple_entries() throws NoSuchFieldException, IllegalAccessException {
when(mockDateMatch.getPatterns()).thenReturn(Collections.singletonList(random));
Expand Down
Loading

0 comments on commit f19de03

Please sign in to comment.