diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java index b2dd7a5541..290198f54f 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper; import io.micrometer.core.instrument.util.StringUtils; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.parser.PipelineTransformer; import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer; @@ -42,6 +43,7 @@ public class DataPrepper implements PipelinesProvider { private final PipelinesObserver pipelinesObserver; private final Map transformationPipelines; private final Predicate> shouldShutdownOnPipelineFailurePredicate; + private final PipelinesDataFlowModel pipelinesDataFlowModel; // TODO: Remove DataPrepperServer dependency on DataPrepper @Inject @@ -67,8 +69,9 @@ public DataPrepper( this.pluginFactory = pluginFactory; transformationPipelines = pipelineTransformer.transformConfiguration(); + pipelinesDataFlowModel = pipelineTransformer.getPipelinesDataFlowModel(); this.shouldShutdownOnPipelineFailurePredicate = shouldShutdownOnPipelineFailurePredicate; - if (transformationPipelines.size() == 0) { + if (transformationPipelines.isEmpty()) { throw new RuntimeException("No valid pipeline is available for execution, exiting"); } this.peerForwarderServer = peerForwarderServer; @@ -145,6 +148,10 @@ public Map getTransformationPipelines() { return transformationPipelines; } + public PipelinesDataFlowModel getPipelinesDataFlowModel() { + return pipelinesDataFlowModel; + } + public void registerShutdownHandler(final DataPrepperShutdownListener shutdownListener) { this.shutdownListeners.add(shutdownListener); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java index b3f4aede00..c57c5eab61 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java @@ -386,4 +386,8 @@ private Buffer applyCircuitBreakerToBuffer(final Source source, final Buffer buf .map(b -> (Buffer) b) .orElseGet(() -> buffer); } + + public PipelinesDataFlowModel getPipelinesDataFlowModel() { + return pipelinesDataFlowModel; + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelinesProvider.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelinesProvider.java index e948bd176a..8d6dc8bda8 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelinesProvider.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelinesProvider.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.pipeline; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; + import java.util.Map; /** @@ -12,4 +14,5 @@ */ public interface PipelinesProvider { Map getTransformationPipelines(); + PipelinesDataFlowModel getPipelinesDataFlowModel(); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/DataPrepperServer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/DataPrepperServer.java index 3158cde82c..5670af0a13 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/DataPrepperServer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/DataPrepperServer.java @@ -30,6 +30,7 @@ public class DataPrepperServer { private static final Logger LOG = LoggerFactory.getLogger(DataPrepperServer.class); private final HttpServerProvider serverProvider; private final ListPipelinesHandler listPipelinesHandler; + private final GetPipelinesHandler getPipelinesHandler; private final ShutdownHandler shutdownHandler; private final PrometheusMeterRegistry prometheusMeterRegistry; private final Authenticator authenticator; @@ -41,12 +42,14 @@ public DataPrepperServer( final HttpServerProvider serverProvider, final ListPipelinesHandler listPipelinesHandler, final ShutdownHandler shutdownHandler, + final GetPipelinesHandler getPipelinesHandler, @Autowired(required = false) @Nullable final PrometheusMeterRegistry prometheusMeterRegistry, @Autowired(required = false) @Nullable final Authenticator authenticator ) { this.serverProvider = serverProvider; this.listPipelinesHandler = listPipelinesHandler; this.shutdownHandler = shutdownHandler; + this.getPipelinesHandler = getPipelinesHandler; this.prometheusMeterRegistry = prometheusMeterRegistry; this.authenticator = authenticator; executorService = Executors.newFixedThreadPool(3); @@ -67,6 +70,7 @@ private HttpServer createServer() { createContext(server, listPipelinesHandler, authenticator, "/list"); createContext(server, shutdownHandler, authenticator, "/shutdown"); + createContext(server, getPipelinesHandler, authenticator, "/pipelines"); if (prometheusMeterRegistry != null) { final PrometheusMetricsHandler prometheusMetricsHandler = new PrometheusMetricsHandler(prometheusMeterRegistry); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/GetPipelinesHandler.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/GetPipelinesHandler.java new file mode 100644 index 0000000000..9f19a599b9 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/GetPipelinesHandler.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.pipeline.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.opensearch.dataprepper.model.configuration.PipelineModel; +import org.opensearch.dataprepper.pipeline.PipelinesProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.HttpMethod; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.util.ArrayList; +import java.util.List; + +public class GetPipelinesHandler implements HttpHandler { + + private final PipelinesProvider pipelinesProvider; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final Logger LOG = LoggerFactory.getLogger(GetPipelinesHandler.class); + + public GetPipelinesHandler(final PipelinesProvider pipelinesProvider) { + this.pipelinesProvider = pipelinesProvider; + } + + @Override + public void handle(final HttpExchange exchange) throws IOException { + String requestMethod = exchange.getRequestMethod(); + if (!requestMethod.equals(HttpMethod.GET)) { + exchange.sendResponseHeaders(HttpURLConnection.HTTP_BAD_METHOD, 0); + exchange.getResponseBody().close(); + return; + } + + try { + List pipelineModels = new ArrayList<>(pipelinesProvider.getPipelinesDataFlowModel().getPipelines().values()); + + final byte[] response = OBJECT_MAPPER.writeValueAsString(pipelineModels).getBytes(); + + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, response.length); + exchange.getResponseBody().write(response); + } catch (final Exception e) { + LOG.error("Caught exception listing pipelines", e); + exchange.sendResponseHeaders(HttpURLConnection.HTTP_INTERNAL_ERROR, 0); + } finally { + exchange.getResponseBody().close(); + } + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/config/DataPrepperServerConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/config/DataPrepperServerConfiguration.java index f2767d33f0..49b12a8e52 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/config/DataPrepperServerConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/config/DataPrepperServerConfiguration.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.pipeline.PipelinesProvider; import org.opensearch.dataprepper.pipeline.server.DataPrepperCoreAuthenticationProvider; +import org.opensearch.dataprepper.pipeline.server.GetPipelinesHandler; import org.opensearch.dataprepper.pipeline.server.ListPipelinesHandler; import org.opensearch.dataprepper.pipeline.server.ShutdownHandler; import com.sun.net.httpserver.Authenticator; @@ -74,4 +75,9 @@ public ListPipelinesHandler listPipelinesHandler(final PipelinesProvider pipelin public ShutdownHandler shutdownHandler(final DataPrepper dataPrepper) { return new ShutdownHandler(dataPrepper); } + + @Bean + public GetPipelinesHandler GetPipelinesHandler(final PipelinesProvider pipelinesProvider) { + return new GetPipelinesHandler(pipelinesProvider); + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/DataPrepperServerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/DataPrepperServerTest.java index f82ffc2660..35c392ca8a 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/DataPrepperServerTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/DataPrepperServerTest.java @@ -48,6 +48,9 @@ public class DataPrepperServerTest { @Mock private ShutdownHandler shutdownHandler; + @Mock + private GetPipelinesHandler getPipelinesHandler; + @Mock private PrometheusMeterRegistry prometheusMeterRegistry; @@ -82,7 +85,7 @@ public void testGivenValidServerWhenStartThenShouldCallServerStart() { verifyServerStart(); verify(server).createContext(eq("/metrics/prometheus"), any(PrometheusMetricsHandler.class)); verify(server).createContext(eq("/metrics/sys"), any(PrometheusMetricsHandler.class)); - verify(context, times(4)).setAuthenticator(eq(authenticator)); + verify(context, times(5)).setAuthenticator(eq(authenticator)); } @Test @@ -93,7 +96,7 @@ public void testGivenValidServerWhenStartThenShouldCallServerStart_NullPrometheu dataPrepperServer.start(); verifyServerStart(); - verify(context, times(2)).setAuthenticator(eq(authenticator)); + verify(context, times(3)).setAuthenticator(eq(authenticator)); } @Test @@ -145,6 +148,7 @@ private void verifyServerStart() { verify(httpServerProvider).get(); verify(server).createContext("/list", listPipelinesHandler); verify(server).createContext(eq("/shutdown"), eq(shutdownHandler)); + verify(server).createContext(eq("/pipelines"), eq(getPipelinesHandler)); final ArgumentCaptor executorServiceArgumentCaptor = ArgumentCaptor.forClass(ExecutorService.class); verify(server).setExecutor(executorServiceArgumentCaptor.capture()); final ExecutorService actualExecutorService = executorServiceArgumentCaptor.getValue(); @@ -158,6 +162,6 @@ private void verifyServerStart() { } private DataPrepperServer createObjectUnderTest(final PrometheusMeterRegistry prometheusMeterRegistry, final Authenticator authenticator) { - return new DataPrepperServer(httpServerProvider, listPipelinesHandler, shutdownHandler, prometheusMeterRegistry, authenticator); + return new DataPrepperServer(httpServerProvider, listPipelinesHandler, shutdownHandler, getPipelinesHandler, prometheusMeterRegistry, authenticator); } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/GetPipelinesHandlerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/GetPipelinesHandlerTest.java new file mode 100644 index 0000000000..8f0d2b5d1e --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/GetPipelinesHandlerTest.java @@ -0,0 +1,126 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.pipeline.server; + +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpExchange; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import org.opensearch.dataprepper.model.configuration.PipelineModel; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.SinkModel; +import org.opensearch.dataprepper.pipeline.PipelinesProvider; + +import javax.ws.rs.HttpMethod; +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class GetPipelinesHandlerTest { + @Mock + private PipelinesProvider pipelinesProvider; + @Mock + private HttpExchange httpExchange; + + @Mock + private OutputStream outputStream; + + @BeforeEach + public void beforeEach() { + when(httpExchange.getResponseBody()) + .thenReturn(outputStream); + } + + private GetPipelinesHandler createObjectUnderTest() { + return new GetPipelinesHandler(pipelinesProvider); + } + + @ParameterizedTest + @ValueSource(strings = { HttpMethod.GET }) + public void testGivenPipelinesThenResponseWritten(String httpMethod) throws IOException { + final String pipelineName = "test-pipeline"; + final Headers headers = mock(Headers.class); + doNothing().when(headers).add(anyString(), anyString()); + final DataPrepperVersion version = DataPrepperVersion.parse("2.0"); + final PluginModel source = new PluginModel("testSource", (Map) null); + final List processors = Collections.singletonList(new PluginModel("testProcessor", (Map) null)); + final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); + final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50); + + final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(version, Collections.singletonMap(pipelineName, pipelineModel)); + + when(pipelinesProvider.getPipelinesDataFlowModel()) + .thenReturn(pipelinesDataFlowModel); + when(httpExchange.getResponseHeaders()) + .thenReturn(headers); + when(httpExchange.getRequestMethod()) + .thenReturn(httpMethod); + + final GetPipelinesHandler handler = createObjectUnderTest(); + + handler.handle(httpExchange); + + verify(headers) + .add(eq("Content-Type"), eq("text/plain; charset=UTF-8")); + verify(httpExchange) + .sendResponseHeaders(eq(HttpURLConnection.HTTP_OK), anyLong()); + verify(outputStream) + .write(any(byte[].class)); + verify(outputStream) + .close(); + } + + @ParameterizedTest + @ValueSource(strings = { HttpMethod.DELETE, HttpMethod.PATCH, HttpMethod.PUT, HttpMethod.POST }) + public void testGivenProhibitedHttpMethodThenErrorResponseWritten(String httpMethod) throws IOException { + final GetPipelinesHandler handler = createObjectUnderTest(); + + when(httpExchange.getRequestMethod()) + .thenReturn(httpMethod); + + handler.handle(httpExchange); + + verify(httpExchange) + .sendResponseHeaders(eq(HttpURLConnection.HTTP_BAD_METHOD), eq(0L)); + verify(outputStream) + .close(); + } + + @ParameterizedTest + @ValueSource(strings = { HttpMethod.GET }) + public void testGivenExceptionThrownThenErrorResponseWritten(String httpMethod) throws IOException { + when(httpExchange.getRequestMethod()) + .thenReturn(httpMethod); + + pipelinesProvider = null; + final GetPipelinesHandler handler = createObjectUnderTest(); + handler.handle(httpExchange); + + verify(httpExchange) + .sendResponseHeaders(eq(HttpURLConnection.HTTP_INTERNAL_ERROR), eq(0L)); + verify(outputStream) + .close(); + } +}