diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartition.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartition.java index 0fa683b2a5..6ec1758976 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartition.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartition.java @@ -5,9 +5,11 @@ package org.opensearch.dataprepper.model.source.coordinator.enhanced; +import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +35,8 @@ public abstract class EnhancedSourcePartition implements EnhancedPartition { private static final Logger LOG = LoggerFactory.getLogger(EnhancedSourcePartition.class); - private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final ObjectMapper objectMapper = new ObjectMapper(new JsonFactory()) + .registerModule(new JavaTimeModule()); private SourcePartitionStoreItem sourcePartitionStoreItem; @@ -49,9 +52,8 @@ public void setSourcePartitionStoreItem(SourcePartitionStoreItem sourcePartition * Helper method to convert progress state. * This is because the state is currently stored as a String in the coordination store. * - * @param progressStateClass class of progress state + * @param progressStateClass class of progress state * @param serializedPartitionProgressState serialized value of the partition progress state - * * @return returns the converted value of the progress state */ public T convertStringToPartitionProgressState(Class progressStateClass, final String serializedPartitionProgressState) { diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartitionTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartitionTest.java index 36ba2ba63b..cf4e840442 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartitionTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartitionTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import java.time.Instant; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -22,14 +23,17 @@ public class EnhancedSourcePartitionTest { + private final ObjectMapper objectMapper = new ObjectMapper(); private String partitionKey; private TestPartitionProgressState testPartitionProgressState; - private final ObjectMapper objectMapper = new ObjectMapper(); + private TestInstantTypeProgressState testInstantTypeProgressState; @BeforeEach void setup() { partitionKey = UUID.randomUUID().toString(); testPartitionProgressState = new TestPartitionProgressState(UUID.randomUUID().toString()); + testInstantTypeProgressState = new TestInstantTypeProgressState(Instant.now()); + } private EnhancedSourcePartition createObjectUnderTest() { @@ -79,6 +83,19 @@ void convertFromStringToPartitionState_converts_as_expected() { assertThat(result.getTestValue(), equalTo(testPartitionProgressState.getTestValue())); } + @Test + void convertFromInstantToPartitionState_converts_as_expected() { + final EnhancedSourcePartition objectUnderTest = + new TestInstantTypeSourcePartition(partitionKey, testInstantTypeProgressState); + + final String serializedString = objectUnderTest.convertPartitionProgressStatetoString(Optional.of(testInstantTypeProgressState)); + + final TestInstantTypeProgressState result = + objectUnderTest.convertStringToPartitionProgressState(TestInstantTypeProgressState.class, serializedString); + assertThat(result, notNullValue()); + assertThat(result.getTestValue(), equalTo(testInstantTypeProgressState.getTestValue())); + } + @Test void convertPartitionStateToStringWithEmptyState_returns_null() { final String result = createObjectUnderTest().convertPartitionProgressStatetoString(Optional.empty()); @@ -116,6 +133,31 @@ void convertFromStringToPartitionStateWithPrimitiveType_returns_expected_result( assertThat(resultWithNullClass, equalTo(stateMap)); } + public static class TestInstantTypeSourcePartition extends EnhancedSourcePartition { + + private final String partitionKey; + private final TestInstantTypeProgressState testPartitionProgressState; + + public TestInstantTypeSourcePartition(final String partitionKey, final TestInstantTypeProgressState partitionProgressState) { + this.partitionKey = partitionKey; + this.testPartitionProgressState = partitionProgressState; + } + + @Override + public String getPartitionType() { + return "TEST"; + } + + @Override + public String getPartitionKey() { + return partitionKey; + } + + @Override + public Optional getProgressState() { + return Optional.of(testPartitionProgressState); + } + } public class TestEnhancedSourcePartition extends EnhancedSourcePartition { diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/TestInstantTypeProgressState.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/TestInstantTypeProgressState.java new file mode 100644 index 0000000000..b305ca0e96 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/TestInstantTypeProgressState.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.source.coordinator.enhanced; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.Instant; + +public class TestInstantTypeProgressState { + + @JsonProperty("testValue") + private Instant testValue; + + public TestInstantTypeProgressState(@JsonProperty("testValue") final Instant testValue) { + this.testValue = testValue; + } + + public Instant getTestValue() { + return testValue; + } + +} \ No newline at end of file