diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSource.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSource.java index bac92a988a..266bf617bb 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSource.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSource.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -25,7 +26,7 @@ * A simple source which reads data from console each line at a time. It exits when it reads case insensitive "exit" * from console or if Pipeline notifies to stop. */ -@DataPrepperPlugin(name = "stdin", pluginType = Source.class) +@DataPrepperPlugin(name = "stdin", pluginType = Source.class, pluginConfigurationType = StdInSourceConfig.class) public class StdInSource implements Source> { private static final Logger LOG = LoggerFactory.getLogger(StdInSource.class); private static final String ATTRIBUTE_TIMEOUT = "write_timeout"; @@ -37,16 +38,15 @@ public class StdInSource implements Source> { /** * Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper - * runtime engine to construct an instance of {@link StdInSource} using an instance of {@link PluginSetting} which - * has access to pluginSetting metadata from pipeline - * pluginSetting file. + * runtime engine to construct an instance of {@link StdInSource} using an instance of {@link StdInSourceConfig} * - * @param pluginSetting instance with metadata information from pipeline pluginSetting file. + * @param stdInSourceConfig The configuration instance for {@link StdInSource} + * @param pipelineDescription The pipeline description which has access to pipeline Name */ - public StdInSource(final PluginSetting pluginSetting) { - this(checkNotNull(pluginSetting, "PluginSetting cannot be null") - .getIntegerOrDefault(ATTRIBUTE_TIMEOUT, WRITE_TIMEOUT), - pluginSetting.getPipelineName()); + public StdInSource(final StdInSourceConfig stdInSourceConfig, final PipelineDescription pipelineDescription) { + this(checkNotNull(stdInSourceConfig, "StdInSourceConfig cannot be null") + .getWriteTimeout(), + pipelineDescription.getPipelineName()); } public StdInSource(final int writeTimeout, final String pipelineName) { diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSourceConfig.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSourceConfig.java new file mode 100644 index 0000000000..66ce3a17c0 --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSourceConfig.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class StdInSourceConfig { + private static final int WRITE_TIMEOUT = 5_000; + + @JsonProperty("write_timeout") + private int writeTimeout = WRITE_TIMEOUT; + + public int getWriteTimeout() { + return writeTimeout; + } +} diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/StdInSourceTests.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/StdInSourceTests.java index d4a42751ad..9be66321a0 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/StdInSourceTests.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/StdInSourceTests.java @@ -5,10 +5,13 @@ package org.opensearch.dataprepper.plugins.source; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -18,6 +21,7 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Queue; @@ -27,6 +31,8 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class StdInSourceTests { private static final String SOURCE_CONTENT = "THIS IS A TEST\nexit"; @@ -56,10 +62,15 @@ void testStdInSourceCreationUsingParameters() { } @Test - void testStdInSourceCreationUsingPluginSetting() { - final PluginSetting pluginSetting = new PluginSetting("stdin", null); - pluginSetting.setPipelineName(TEST_PIPELINE_NAME); - final StdInSource stdInSource = new StdInSource(pluginSetting); + void testStdInSourceCreationUsingStdInSourceConfig() throws JsonProcessingException { + final HashMap configMap = new HashMap<>(); + configMap.put("write_timeout", 5_000); + ObjectMapper objectMapper = new ObjectMapper(); + String json = objectMapper.writeValueAsString(configMap); + StdInSourceConfig stdInSourceConfig = objectMapper.readValue(json, StdInSourceConfig.class); + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + final StdInSource stdInSource = new StdInSource(stdInSourceConfig, pipelineDescription); assertThat(stdInSource, notNullValue()); } @@ -74,10 +85,11 @@ void testStdInSourceCreationWithNullPipelineName() { @Test void testStdInSourceCreationWithNullPluginSetting() { + PipelineDescription pipelineDescription = mock(PipelineDescription.class); try { - new StdInSource(null); + new StdInSource(null, pipelineDescription); } catch (NullPointerException ex) { - assertThat(ex.getMessage(), is(equalTo("PluginSetting cannot be null"))); + assertThat(ex.getMessage(), is(equalTo("StdInSourceConfig cannot be null"))); } }