From f19de03d5418925935e019837fc4824fb250820c Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Wed, 20 Dec 2023 11:58:57 -0800 Subject: [PATCH] Add support for epoch timestamps and configurable output format (#3860) * Add support for epoch timestamps and configurable output format Signed-off-by: Krishna Kondaka * Add support for epoch timestamps and configurable output format Signed-off-by: Krishna Kondaka * Addressed review comments Signed-off-by: Krishna Kondaka * Addressed review comments Signed-off-by: Krishna Kondaka --------- Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- data-prepper-plugins/date-processor/README.md | 3 + .../plugins/processor/date/DateProcessor.java | 81 ++++++++++++-- .../processor/date/DateProcessorConfig.java | 53 +++++++++- .../date/DateProcessorConfigTest.java | 29 +++++ .../processor/date/DateProcessorTests.java | 100 ++++++++++++++++++ 5 files changed, 258 insertions(+), 8 deletions(-) diff --git a/data-prepper-plugins/date-processor/README.md b/data-prepper-plugins/date-processor/README.md index c4d9acd4f5..b23962faab 100644 --- a/data-prepper-plugins/date-processor/README.md +++ b/data-prepper-plugins/date-processor/README.md @@ -66,6 +66,7 @@ valid key and at least one pattern is required if match is configured. * `patterns`: List of possible patterns the timestamp value of key can have. The patterns are based on sequence of letters and symbols. The `patterns` support all the patterns listed in Java [DatetimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). + and also supports `epoch_second`, `epoch_milli` and `epoch_nano` values which represents the timestamp as the number of seconds, milliseconds and nano seconds since epoch. Epoch values are always UTC time zone. * Type: `List` The following example of date configuration will use `timestamp` key to match against given patterns and stores the timestamp in ISO 8601 @@ -106,6 +107,8 @@ processor: * `to_origination_metadata` (Optional): When this option is used, matched time is put into the event's metadata as an instance of `Instant`. +* `output_format` (Optional): indicates the format of the `@timestamp`. Default is `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`. + ## Metrics * `dateProcessingMatchSuccessCounter`: Number of records that match with at least one pattern specified in match configuration option. diff --git a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java index d0808053b2..6d43cfbff2 100644 --- a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java +++ b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java @@ -20,11 +20,13 @@ import java.time.Instant; import java.time.LocalDate; import java.time.ZonedDateTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.Objects; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; @@ -33,12 +35,16 @@ public class DateProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(DateProcessor.class); private static final String OUTPUT_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; + private static final int LENGTH_OF_EPOCH_IN_MILLIS = 13; + private static final int LENGTH_OF_EPOCH_SECONDS = 10; static final String DATE_PROCESSING_MATCH_SUCCESS = "dateProcessingMatchSuccess"; static final String DATE_PROCESSING_MATCH_FAILURE = "dateProcessingMatchFailure"; private String keyToParse; private List dateTimeFormatters; + private Set epochFormatters; + private String outputFormat; private final DateProcessorConfig dateProcessorConfig; private final ExpressionEvaluator expressionEvaluator; @@ -50,6 +56,7 @@ public DateProcessor(PluginMetrics pluginMetrics, final DateProcessorConfig date super(pluginMetrics); this.dateProcessorConfig = dateProcessorConfig; this.expressionEvaluator = expressionEvaluator; + this.outputFormat = dateProcessorConfig.getOutputFormat(); dateProcessingMatchSuccessCounter = pluginMetrics.counter(DATE_PROCESSING_MATCH_SUCCESS); dateProcessingMatchFailureCounter = pluginMetrics.counter(DATE_PROCESSING_MATCH_FAILURE); @@ -68,10 +75,10 @@ public Collection> doExecute(Collection> records) { String zonedDateTime = null; - if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived())) + if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived())) { zonedDateTime = getDateTimeFromTimeReceived(record); - else if (keyToParse != null && !keyToParse.isEmpty()) { + } else if (keyToParse != null && !keyToParse.isEmpty()) { Pair result = getDateTimeFromMatch(record); if (result != null) { zonedDateTime = result.getLeft(); @@ -85,8 +92,9 @@ else if (keyToParse != null && !keyToParse.isEmpty()) { populateDateProcessorMetrics(zonedDateTime); } - if (zonedDateTime != null) + if (zonedDateTime != null) { record.getData().put(dateProcessorConfig.getDestination(), zonedDateTime); + } } return records; } @@ -101,7 +109,8 @@ private void populateDateProcessorMetrics(final String zonedDateTime) { private void extractKeyAndFormatters() { for (DateProcessorConfig.DateMatch entry: dateProcessorConfig.getMatch()) { keyToParse = entry.getKey(); - dateTimeFormatters = entry.getPatterns().stream().map(this::getSourceFormatter).collect(Collectors.toList()); + epochFormatters = entry.getPatterns().stream().filter(pattern -> pattern.contains("epoch")).collect(Collectors.toSet()); + dateTimeFormatters = entry.getPatterns().stream().filter(pattern -> !pattern.contains("epoch")).map(this::getSourceFormatter).collect(Collectors.toList()); } } @@ -146,11 +155,71 @@ private String getSourceTimestamp(final Record record) { } } + private Pair getEpochFormatOutput(Instant time) { + if (outputFormat.equals("epoch_second")) { + return Pair.of(Long.toString(time.getEpochSecond()), time); + } else if (outputFormat.equals("epoch_milli")) { + return Pair.of(Long.toString(time.toEpochMilli()), time); + } else { // epoch_nano. validation for valid epoch_ should be + // done at init time + long nano = (long)time.getEpochSecond() * 1000_000_000 + (long) time.getNano(); + return Pair.of(Long.toString(nano), time); + } + } + private Pair getFormattedDateTimeString(final String sourceTimestamp) { + ZoneId srcZoneId = dateProcessorConfig.getSourceZoneId(); + ZoneId dstZoneId = dateProcessorConfig.getDestinationZoneId(); + Long numberValue = null; + Instant epochTime; + + if (epochFormatters.size() > 0) { + try { + numberValue = Long.parseLong(sourceTimestamp); + } catch (NumberFormatException e) { + numberValue = null; + } + } + if (numberValue != null) { + int timestampLength = sourceTimestamp.length(); + if (timestampLength > LENGTH_OF_EPOCH_IN_MILLIS) { + if (epochFormatters.contains("epoch_nano")) { + epochTime = Instant.ofEpochSecond(numberValue/1000_000_000, numberValue % 1000_000_000); + } else { + LOG.warn("Source time value is larger than epoch pattern configured. epoch_nano is expected but not present in the patterns list"); + return null; + } + } else if (timestampLength > LENGTH_OF_EPOCH_SECONDS) { + if (epochFormatters.contains("epoch_milli")) { + epochTime = Instant.ofEpochMilli(numberValue); + } else { + LOG.warn("Source time value is larger than epoch pattern configured. epoch_milli is expected but not present in the patterns list"); + return null; + } + } else { + epochTime = Instant.ofEpochSecond(numberValue); + } + // Epochs are always UTC zone + srcZoneId = ZoneId.of("UTC"); + try { + if (outputFormat.startsWith("epoch_")) { + return getEpochFormatOutput(epochTime); + } else { + DateTimeFormatter outputFormatter = getOutputFormatter().withZone(dstZoneId); + ZonedDateTime tmp = ZonedDateTime.ofInstant(epochTime, srcZoneId); + return Pair.of(tmp.format(outputFormatter.withZone(dstZoneId)), tmp.toInstant()); + } + } catch (Exception ignored) { + } + } + for (DateTimeFormatter formatter : dateTimeFormatters) { try { ZonedDateTime tmp = ZonedDateTime.parse(sourceTimestamp, formatter); - return Pair.of(tmp.format(getOutputFormatter().withZone(dateProcessorConfig.getDestinationZoneId())), tmp.toInstant()); + if (outputFormat.startsWith("epoch_")) { + return getEpochFormatOutput(tmp.toInstant()); + } + return Pair.of(tmp.format(getOutputFormatter().withZone(dstZoneId)), tmp.toInstant()); } catch (Exception ignored) { } } @@ -160,7 +229,7 @@ private Pair getFormattedDateTimeString(final String sourceTime } private DateTimeFormatter getOutputFormatter() { - return DateTimeFormatter.ofPattern(OUTPUT_FORMAT); + return DateTimeFormatter.ofPattern(outputFormat); } @Override diff --git a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java index 5e06b48cbb..8c9c37957d 100644 --- a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java +++ b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java @@ -12,11 +12,13 @@ import java.time.ZoneId; import java.util.List; import java.util.Locale; +import java.time.format.DateTimeFormatter; public class DateProcessorConfig { static final Boolean DEFAULT_FROM_TIME_RECEIVED = false; static final Boolean DEFAULT_TO_ORIGINATION_METADATA = false; static final String DEFAULT_DESTINATION = "@timestamp"; + static final String DEFAULT_OUTPUT_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; static final String DEFAULT_SOURCE_TIMEZONE = ZoneId.systemDefault().toString(); static final String DEFAULT_DESTINATION_TIMEZONE = ZoneId.systemDefault().toString(); @@ -41,6 +43,41 @@ public String getKey() { public List getPatterns() { return patterns; } + + @JsonIgnore + public boolean isValidPatterns() { + // For now, allow only one of the three "epoch_" pattern + int count = 0; + for (final String pattern: patterns) { + if (pattern.startsWith("epoch_")) { + count++; + } + if (count > 1) { + return false; + } + } + for (final String pattern: patterns) { + if (!isValidPattern(pattern)) { + return false; + } + } + return true; + } + + public static boolean isValidPattern(final String pattern) { + if (pattern.equals("epoch_second") || + pattern.equals("epoch_milli") || + pattern.equals("epoch_nano")) { + return true; + } + try { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern); + return true; + } catch (Exception e) { + return false; + } + } + } @JsonProperty("from_time_received") @@ -55,6 +92,9 @@ public List getPatterns() { @JsonProperty("destination") private String destination = DEFAULT_DESTINATION; + @JsonProperty("output_format") + private String outputFormat = DEFAULT_OUTPUT_FORMAT; + @JsonProperty("source_timezone") private String sourceTimezone = DEFAULT_SOURCE_TIMEZONE; @@ -76,6 +116,10 @@ public List getPatterns() { @JsonIgnore private Locale sourceLocale; + public String getOutputFormat() { + return outputFormat; + } + public Boolean getFromTimeReceived() { return fromTimeReceived; } @@ -160,15 +204,20 @@ boolean isValidMatch() { if (match.size() != 1) return false; - return match.get(0).getPatterns() != null && !match.get(0).getPatterns().isEmpty(); + return match.get(0).getPatterns() != null && !match.get(0).getPatterns().isEmpty() && match.get(0).isValidPatterns(); } return true; } + @AssertTrue(message = "Invalid output format.") + boolean isValidOutputFormat() { + return DateMatch.isValidPattern(outputFormat); + } + @AssertTrue(message = "Invalid source_timezone provided.") boolean isSourceTimezoneValid() { try { - sourceZoneId = buildZoneId(sourceTimezone); + sourceZoneId = buildZoneId(sourceTimezone); return true; } catch (Exception e) { return false; diff --git a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java index db604039fa..d6cc9a9149 100644 --- a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java +++ b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collections; @@ -42,6 +43,7 @@ class Validation { void setUp() { random = UUID.randomUUID().toString(); mockDateMatch = mock(DateProcessorConfig.DateMatch.class); + when(mockDateMatch.isValidPatterns()).thenReturn(true); } @Test @@ -67,6 +69,23 @@ void isValidMatchAndFromTimestampReceived_should_return_false_if_from_time_recei assertThat(dateProcessorConfig.isValidMatchAndFromTimestampReceived(), equalTo(false)); } + @Test + void testValidAndInvalidOutputFormats() throws NoSuchFieldException, IllegalAccessException { + setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", random); + assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false)); + + setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_second"); + assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true)); + setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_milli"); + assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true)); + setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_nano"); + assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true)); + setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_xyz"); + assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false)); + setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnXXX"); + assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true)); + } + @Test void isValidMatch_should_return_true_if_match_has_single_entry() throws NoSuchFieldException, IllegalAccessException { when(mockDateMatch.getPatterns()).thenReturn(Collections.singletonList(random)); @@ -77,6 +96,16 @@ void isValidMatch_should_return_true_if_match_has_single_entry() throws NoSuchFi assertThat(dateProcessorConfig.isValidMatch(), equalTo(true)); } + @Test + void isValidMatch_should_return_false_if_match_has_multiple_epoch_patterns() throws NoSuchFieldException, IllegalAccessException { + when(mockDateMatch.getPatterns()).thenReturn(List.of("epoch_second", "epoch_milli")); + + List dateMatches = Arrays.asList(mockDateMatch, mockDateMatch); + reflectivelySetField(dateProcessorConfig, "match", dateMatches); + + assertThat(dateProcessorConfig.isValidMatch(), equalTo(false)); + } + @Test void isValidMatch_should_return_false_if_match_has_multiple_entries() throws NoSuchFieldException, IllegalAccessException { when(mockDateMatch.getPatterns()).thenReturn(Collections.singletonList(random)); diff --git a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java index ce3906a635..684737bca4 100644 --- a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java +++ b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java @@ -18,12 +18,15 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalTime; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZonedDateTime; @@ -35,6 +38,8 @@ import java.util.Locale; import java.util.Map; import java.util.UUID; +import java.util.stream.Stream; +import java.util.Random; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -83,6 +88,7 @@ void setup() { lenient().when(pluginMetrics.counter(DateProcessor.DATE_PROCESSING_MATCH_SUCCESS)).thenReturn(dateProcessingMatchSuccessCounter); lenient().when(pluginMetrics.counter(DateProcessor.DATE_PROCESSING_MATCH_FAILURE)).thenReturn(dateProcessingMatchFailureCounter); when(mockDateProcessorConfig.getDateWhen()).thenReturn(null); + lenient().when(mockDateProcessorConfig.getOutputFormat()).thenReturn(DateProcessorConfig.DEFAULT_OUTPUT_FORMAT); expectedInstant = Instant.now(); expectedDateTime = LocalDateTime.ofInstant(expectedInstant, ZoneId.systemDefault()); } @@ -213,6 +219,34 @@ void match_with_custom_destination_test() { verify(dateProcessingMatchSuccessCounter, times(1)).increment(); } + @Test + void match_with_epoch_second_pattern() { + when(mockDateMatch.getKey()).thenReturn("logDate"); + String epochSecondsPattern = "epoch_second"; + when(mockDateMatch.getPatterns()).thenReturn(Collections.singletonList(epochSecondsPattern)); + List dateMatches = Collections.singletonList(mockDateMatch); + when(mockDateProcessorConfig.getMatch()).thenReturn(dateMatches); + when(mockDateProcessorConfig.getSourceZoneId()).thenReturn(ZoneId.of("UTC")); + when(mockDateProcessorConfig.getDestinationZoneId()).thenReturn(ZoneId.systemDefault()); + + dateProcessor = createObjectUnderTest(); + + LocalDate localDate = LocalDate.now(ZoneId.of("UTC")); + testData = getTestData(); + long epochSeconds = Instant.now().getEpochSecond(); + testData.put("logDate", epochSeconds); + + final Record record = buildRecordWithEvent(testData); + final List> processedRecords = (List>) dateProcessor.doExecute(Collections.singletonList(record)); + ZonedDateTime actualZonedDateTime = processedRecords.get(0).getData().get(TIMESTAMP_KEY, ZonedDateTime.class); + LocalDateTime localDateTime = localDate.atTime(LocalTime.now()); + ZonedDateTime expectedZonedDateTime = localDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds), ZoneId.systemDefault()).atZone(ZoneId.systemDefault()); + actualZonedDateTime = actualZonedDateTime.withZoneSameInstant(ZoneId.of("UTC")); + + Assertions.assertTrue(actualZonedDateTime.isEqual(expectedZonedDateTime)); + verify(dateProcessingMatchSuccessCounter, times(1)).increment(); + } + @Test void match_with_missing_hours_minutes_seconds_adds_zeros_test() { when(mockDateMatch.getKey()).thenReturn("logDate"); @@ -240,6 +274,72 @@ void match_with_missing_hours_minutes_seconds_adds_zeros_test() { verify(dateProcessingMatchSuccessCounter, times(1)).increment(); } + private static Stream getInputOutputFormats() { + Instant now = Instant.now(); + long epochSeconds = now.getEpochSecond(); + Random random = new Random(); + long millis = random.nextInt(1000); + long nanos = random.nextInt(1000_000_000); + long epochMillis = epochSeconds * 1000L + millis; + long epochNanos = epochSeconds * 1000_000_000L + nanos; + + ZonedDateTime zdtSeconds = ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds), java.time.ZoneId.of("UTC")); + ZonedDateTime zdtMillis = ZonedDateTime.ofInstant(Instant.ofEpochMilli(epochMillis), java.time.ZoneId.of("UTC")); + ZonedDateTime zdtNanos = ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds, nanos), java.time.ZoneId.of("UTC")); + String testFormat = "yyyy-MMM-dd HH:mm:ss.SSS"; + String testNanosFormat = "yyyy-MMM-dd HH:mm:ss.nnnnnnnnnXXX"; + String defaultFormat = DateProcessorConfig.DEFAULT_OUTPUT_FORMAT; + return Stream.of( + Arguments.of("epoch_second", epochSeconds, "epoch_milli", epochSeconds * 1000L), + Arguments.of("epoch_second", epochSeconds, "epoch_nano", epochSeconds * 1000_000_000L), + Arguments.of("epoch_second", epochSeconds, testFormat, zdtSeconds.format(DateTimeFormatter.ofPattern(testFormat))), + Arguments.of("epoch_second", epochSeconds, defaultFormat, zdtSeconds.format(DateTimeFormatter.ofPattern(defaultFormat))), + Arguments.of("epoch_milli", epochMillis, "epoch_second", epochSeconds), + Arguments.of("epoch_milli", epochMillis, "epoch_nano", epochMillis * 1000_000), + Arguments.of("epoch_milli", epochMillis, testFormat, zdtMillis.format(DateTimeFormatter.ofPattern(testFormat))), + Arguments.of("epoch_milli", epochMillis, defaultFormat, zdtMillis.format(DateTimeFormatter.ofPattern(defaultFormat))), + Arguments.of("epoch_nano", epochNanos, "epoch_second", epochSeconds), + Arguments.of("epoch_nano", epochNanos, "epoch_milli", epochNanos/1000_000), + Arguments.of("epoch_nano", epochNanos, testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat))), + Arguments.of("epoch_nano", epochNanos, defaultFormat, zdtNanos.format(DateTimeFormatter.ofPattern(defaultFormat))), + Arguments.of(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), "epoch_second", epochSeconds), + Arguments.of(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), "epoch_milli", epochNanos/1000_000), + Arguments.of(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), "epoch_nano", epochNanos), + Arguments.of(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), defaultFormat, zdtNanos.format(DateTimeFormatter.ofPattern(defaultFormat))) + ); + } + @ParameterizedTest + @MethodSource("getInputOutputFormats") + void match_with_different_input_output_formats(String inputFormat, Object input, String outputFormat, Object expectedOutput) { + when(mockDateMatch.getKey()).thenReturn("logDate"); + when(mockDateMatch.getPatterns()).thenReturn(Collections.singletonList(inputFormat)); + when(mockDateProcessorConfig.getOutputFormat()).thenReturn(outputFormat); + + List dateMatches = Collections.singletonList(mockDateMatch); + when(mockDateProcessorConfig.getMatch()).thenReturn(dateMatches); + + when(mockDateProcessorConfig.getSourceZoneId()).thenReturn(ZoneId.of("UTC")); + when(mockDateProcessorConfig.getDestinationZoneId()).thenReturn(ZoneId.systemDefault()); + if (!inputFormat.startsWith("epoch_")) { + when(mockDateProcessorConfig.getSourceLocale()).thenReturn(Locale.ROOT); + } + dateProcessor = createObjectUnderTest(); + testData = getTestData(); + testData.put("logDate", input); + final Record record = buildRecordWithEvent(testData); + final List> processedRecords = (List>) dateProcessor.doExecute(Collections.singletonList(record)); + if (outputFormat.equals("epoch_second") || + outputFormat.equals("epoch_milli") || + outputFormat.equals("epoch_nano")) { + Long actualOutput = processedRecords.get(0).getData().get(TIMESTAMP_KEY, Long.class); + assertThat(actualOutput, equalTo((Long)expectedOutput)); + } else { + String actualOutput= processedRecords.get(0).getData().get(TIMESTAMP_KEY, String.class); + assertThat(actualOutput, equalTo((String)expectedOutput)); + } + verify(dateProcessingMatchSuccessCounter, times(1)).increment(); + } + @Test void match_with_wrong_patterns_return_same_record_test_without_timestamp() { when(mockDateMatch.getKey()).thenReturn("logDate");