Skip to content

Commit

Permalink
Migrate STdInSource configuration off of pluginSetting
Browse files Browse the repository at this point in the history
Signed-off-by: Maxwell Brown <[email protected]>
  • Loading branch information
Galactus22625 committed Jan 8, 2025
1 parent 14d161c commit 009d6a5
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
Expand All @@ -25,7 +26,7 @@
* A simple source which reads data from console each line at a time. It exits when it reads case insensitive "exit"
* from console or if Pipeline notifies to stop.
*/
@DataPrepperPlugin(name = "stdin", pluginType = Source.class)
@DataPrepperPlugin(name = "stdin", pluginType = Source.class, pluginConfigurationType = StdInSourceConfig.class)
public class StdInSource implements Source<Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(StdInSource.class);
private static final String ATTRIBUTE_TIMEOUT = "write_timeout";
Expand All @@ -37,16 +38,15 @@ public class StdInSource implements Source<Record<Event>> {

/**
* Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper
* runtime engine to construct an instance of {@link StdInSource} using an instance of {@link PluginSetting} which
* has access to pluginSetting metadata from pipeline
* pluginSetting file.
* runtime engine to construct an instance of {@link StdInSource} using an instance of {@link StdInSourceConfig}
*
* @param pluginSetting instance with metadata information from pipeline pluginSetting file.
* @param stdInSourceConfig The configuration instance for {@link StdInSource}
* @param pipelineDescription The pipeline description which has access to pipeline Name
*/
public StdInSource(final PluginSetting pluginSetting) {
this(checkNotNull(pluginSetting, "PluginSetting cannot be null")
.getIntegerOrDefault(ATTRIBUTE_TIMEOUT, WRITE_TIMEOUT),
pluginSetting.getPipelineName());
public StdInSource(final StdInSourceConfig stdInSourceConfig, final PipelineDescription pipelineDescription) {
this(checkNotNull(stdInSourceConfig, "StdInSourceConfig cannot be null")
.getWriteTimeout(),
pipelineDescription.getPipelineName());
}

public StdInSource(final int writeTimeout, final String pipelineName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source;

import com.fasterxml.jackson.annotation.JsonProperty;

public class StdInSourceConfig {
private static final int WRITE_TIMEOUT = 5_000;

@JsonProperty("write_timeout")
private int writeTimeout = WRITE_TIMEOUT;

public int getWriteTimeout() {
return writeTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

package org.opensearch.dataprepper.plugins.source;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -18,6 +21,7 @@
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
Expand All @@ -27,6 +31,8 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class StdInSourceTests {
private static final String SOURCE_CONTENT = "THIS IS A TEST\nexit";
Expand Down Expand Up @@ -56,10 +62,15 @@ void testStdInSourceCreationUsingParameters() {
}

@Test
void testStdInSourceCreationUsingPluginSetting() {
final PluginSetting pluginSetting = new PluginSetting("stdin", null);
pluginSetting.setPipelineName(TEST_PIPELINE_NAME);
final StdInSource stdInSource = new StdInSource(pluginSetting);
void testStdInSourceCreationUsingStdInSourceConfig() throws JsonProcessingException {
final HashMap<String, Object> configMap = new HashMap<>();
configMap.put("write_timeout", 5_000);
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(configMap);
StdInSourceConfig stdInSourceConfig = objectMapper.readValue(json, StdInSourceConfig.class);
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
final StdInSource stdInSource = new StdInSource(stdInSourceConfig, pipelineDescription);
assertThat(stdInSource, notNullValue());
}

Expand All @@ -74,10 +85,11 @@ void testStdInSourceCreationWithNullPipelineName() {

@Test
void testStdInSourceCreationWithNullPluginSetting() {
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
try {
new StdInSource(null);
new StdInSource(null, pipelineDescription);
} catch (NullPointerException ex) {
assertThat(ex.getMessage(), is(equalTo("PluginSetting cannot be null")));
assertThat(ex.getMessage(), is(equalTo("StdInSourceConfig cannot be null")));
}
}

Expand Down

0 comments on commit 009d6a5

Please sign in to comment.