diff --git a/CHANGELOG.md b/CHANGELOG.md index a0b5dd289ec85..84c7376492915 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679)) - [Remote Store] Upload translog checkpoint as object metadata to translog.tlog([#13637](https://github.com/opensearch-project/OpenSearch/pull/13637)) - Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819)) +- [Streaming Indexing] Enhance RestAction with request / response streaming support ([#13772](https://github.com/opensearch-project/OpenSearch/pull/13772)) ### Dependencies - Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559)) diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/HttpContentSender.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/HttpContentSender.java new file mode 100644 index 0000000000000..49a3f5625ac16 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/HttpContentSender.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.core.action.ActionListener; + +import io.netty.handler.codec.http.HttpContent; +import org.reactivestreams.Publisher; + +/** + * The generic interface for chunked {@link HttpContent} producers (response streaming). + */ +interface HttpContentSender extends Publisher { + /** + * Sends the next {@link HttpContent} over the wire + * @param content next {@link HttpContent} + * @param listener action listener + * @param isLast {@code true} if this is the last chunk, {@code false} otherwise + */ + void send(HttpContent content, ActionListener listener, boolean isLast); +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java new file mode 100644 index 0000000000000..d9c669ffea056 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.http.HttpChunk; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.buffer.ByteBuf; + +class ReactorNetty4HttpChunk implements HttpChunk { + private final AtomicBoolean released; + private final boolean pooled; + private final ByteBuf content; + private final boolean last; + + ReactorNetty4HttpChunk(ByteBuf content, boolean last) { + this(new AtomicBoolean(false), true, content, last); + } + + private ReactorNetty4HttpChunk(AtomicBoolean released, boolean pooled, ByteBuf content, boolean last) { + this.content = content; + this.pooled = pooled; + this.released = released; + this.last = last; + } + + @Override + public BytesReference content() { + assert released.get() == false; + return Netty4Utils.toBytesReference(content); + } + + @Override + public void release() { + if (pooled && released.compareAndSet(false, true)) { + content.release(); + } + } + + @Override + public boolean isLast() { + return last; + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java index bd1646d753016..7a477b0e5d561 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java @@ -26,6 +26,8 @@ import org.opensearch.http.HttpServerChannel; import org.opensearch.http.reactor.netty4.ssl.SslUtils; import org.opensearch.plugins.SecureHttpTransportSettingsProvider; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest.Method; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.reactor.SharedGroupFactory; @@ -40,6 +42,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.Optional; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelOption; @@ -351,24 +354,46 @@ public List protocols() { * @return response publisher */ protected Publisher incomingRequest(HttpServerRequest request, HttpServerResponse response) { - final NonStreamingRequestConsumer consumer = new NonStreamingRequestConsumer<>( - this, - request, - response, - maxCompositeBufferComponents + final Method method = HttpConversionUtil.convertMethod(request.method()); + final Optional dispatchHandlerOpt = dispatcher.dispatchHandler( + request.uri(), + request.fullPath(), + method, + request.params() ); + if (dispatchHandlerOpt.map(RestHandler::supportsStreaming).orElse(false)) { + final ReactorNetty4StreamingRequestConsumer consumer = new ReactorNetty4StreamingRequestConsumer<>( + this, + request, + response + ); + + request.receiveContent() + .switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)) + .subscribe(consumer, error -> {}, () -> consumer.accept(DefaultLastHttpContent.EMPTY_LAST_CONTENT)); + consumer.start(); + + return response.sendObject(consumer); + } else { + final ReactorNetty4NonStreamingRequestConsumer consumer = new ReactorNetty4NonStreamingRequestConsumer<>( + this, + request, + response, + maxCompositeBufferComponents + ); - request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer); - - return Mono.from(consumer).flatMap(hc -> { - final FullHttpResponse r = (FullHttpResponse) hc; - response.status(r.status()); - response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue()))); - response.chunkedTransfer(false); - response.compression(true); - r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue())); - return Mono.from(response.sendObject(r.content())); - }); + request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer); + + return Mono.from(consumer).flatMap(hc -> { + final FullHttpResponse r = (FullHttpResponse) hc; + response.status(r.status()); + response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue()))); + response.chunkedTransfer(false); + response.compression(true); + r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue())); + return Mono.from(response.sendObject(r.content())); + }); + } } /** diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java similarity index 92% rename from plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java rename to plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java index 98b359319ff1b..7df0b3c0c35fe 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java @@ -23,13 +23,13 @@ import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; -class NonStreamingHttpChannel implements HttpChannel { +class ReactorNetty4NonStreamingHttpChannel implements HttpChannel { private final HttpServerRequest request; private final HttpServerResponse response; private final CompletableContext closeContext = new CompletableContext<>(); private final FluxSink emitter; - NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink emitter) { + ReactorNetty4NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink emitter) { this.request = request; this.response = response; this.emitter = emitter; diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java similarity index 89% rename from plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java rename to plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java index d43e23e800e65..c09e7755b1670 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java @@ -25,7 +25,7 @@ import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; -class NonStreamingRequestConsumer implements Consumer, Publisher, Disposable { +class ReactorNetty4NonStreamingRequestConsumer implements Consumer, Publisher, Disposable { private final HttpServerRequest request; private final HttpServerResponse response; private final CompositeByteBuf content; @@ -34,7 +34,7 @@ class NonStreamingRequestConsumer implements Consumer, private final AtomicBoolean disposed = new AtomicBoolean(false); private volatile FluxSink emitter; - NonStreamingRequestConsumer( + ReactorNetty4NonStreamingRequestConsumer( AbstractHttpServerTransport transport, HttpServerRequest request, HttpServerResponse response, @@ -64,12 +64,12 @@ public void accept(T message) { } } - public void process(HttpContent in, FluxSink emitter) { + void process(HttpContent in, FluxSink emitter) { // Consume request body in full before dispatching it content.addComponent(true, in.content().retain()); if (in instanceof LastHttpContent) { - final NonStreamingHttpChannel channel = new NonStreamingHttpChannel(request, response, emitter); + final ReactorNetty4NonStreamingHttpChannel channel = new ReactorNetty4NonStreamingHttpChannel(request, response, emitter); final HttpRequest r = createRequest(request, content); try { diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java new file mode 100644 index 0000000000000..473ddb61f1d21 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.common.concurrent.CompletableContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.http.HttpChunk; +import org.opensearch.http.HttpResponse; +import org.opensearch.http.StreamingHttpChannel; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +class ReactorNetty4StreamingHttpChannel implements StreamingHttpChannel { + private final HttpServerRequest request; + private final HttpServerResponse response; + private final CompletableContext closeContext = new CompletableContext<>(); + private final Publisher receiver; + private final HttpContentSender sender; + private volatile FluxSink producer; + private volatile boolean lastChunkReceived = false; + + ReactorNetty4StreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, HttpContentSender sender) { + this.request = request; + this.response = response; + this.sender = sender; + this.receiver = Flux.create(producer -> this.producer = producer); + this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext)); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() { + request.withConnection(connection -> connection.channel().close()); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public void sendChunk(HttpChunk chunk, ActionListener listener) { + sender.send(createContent(chunk), listener, chunk.isLast()); + } + + @Override + public void sendResponse(HttpResponse response, ActionListener listener) { + sender.send(createContent(response), listener, true); + } + + @Override + public void prepareResponse(int status, Map> headers) { + this.response.status(status); + headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v))); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return (InetSocketAddress) response.remoteAddress(); + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) response.hostAddress(); + } + + @Override + public void receiveChunk(HttpChunk message) { + if (lastChunkReceived) { + return; + } + + producer.next(message); + if (message.isLast()) { + lastChunkReceived = true; + producer.complete(); + } + } + + @Override + public void subscribe(Subscriber subscriber) { + receiver.subscribe(subscriber); + } + + private static HttpContent createContent(HttpResponse response) { + final FullHttpResponse fullHttpResponse = (FullHttpResponse) response; + return new DefaultHttpContent(fullHttpResponse.content()); + } + + private static HttpContent createContent(HttpChunk chunk) { + return new DefaultHttpContent(Unpooled.copiedBuffer(BytesReference.toBytes(chunk.content()))); + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java new file mode 100644 index 0000000000000..4e86b7d9d3173 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.http.AbstractHttpServerTransport; +import org.opensearch.http.HttpChunk; +import org.opensearch.http.HttpRequest; +import org.opensearch.http.StreamingHttpChannel; + +import java.util.function.Consumer; + +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.LastHttpContent; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +class ReactorNetty4StreamingRequestConsumer implements Consumer, Publisher { + private final AbstractHttpServerTransport transport; + private final HttpServerRequest request; + private final HttpContentSender sender; + private final StreamingHttpChannel httpChannel; + + ReactorNetty4StreamingRequestConsumer(AbstractHttpServerTransport transport, HttpServerRequest request, HttpServerResponse response) { + this.transport = transport; + this.request = request; + this.sender = new ReactorNetty4StreamingResponseProducer(); + this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender); + } + + @Override + public void accept(T message) { + if (message instanceof LastHttpContent) { + httpChannel.receiveChunk(createChunk(message, true)); + } else if (message instanceof HttpContent) { + httpChannel.receiveChunk(createChunk(message, false)); + } + } + + @Override + public void subscribe(Subscriber s) { + sender.subscribe(s); + } + + void start() { + transport.incomingStream(createRequest(request), httpChannel); + } + + HttpRequest createRequest(HttpServerRequest request) { + return new ReactorNetty4HttpRequest(request, Unpooled.EMPTY_BUFFER); + } + + HttpChunk createChunk(HttpContent chunk, boolean last) { + return new ReactorNetty4HttpChunk(chunk.content(), last); + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java new file mode 100644 index 0000000000000..b636f3a3c6ac9 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.core.action.ActionListener; + +import io.netty.handler.codec.http.HttpContent; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +class ReactorNetty4StreamingResponseProducer implements HttpContentSender { + private final Publisher sender; + private volatile FluxSink emitter; + + ReactorNetty4StreamingResponseProducer() { + this.sender = Flux.create(emitter -> this.emitter = emitter); + } + + @Override + public void send(HttpContent content, ActionListener listener, boolean isLast) { + try { + emitter.next(content); + listener.onResponse(null); + if (isLast) { + emitter.complete(); + } + } catch (final Exception ex) { + emitter.error(ex); + listener.onFailure(ex); + } + } + + @Override + public void subscribe(Subscriber s) { + sender.subscribe(s); + } +} diff --git a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java index 257aca2b67990..991fbf12072be 100644 --- a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java @@ -357,6 +357,16 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) { logger.trace(() -> new ParameterizedMessage("Http channel accepted: {}", httpChannel)); } + /** + * This method handles an incoming http request as a stream. + * + * @param httpRequest that is incoming + * @param httpChannel that received the http request + */ + public void incomingStream(HttpRequest httpRequest, final StreamingHttpChannel httpChannel) { + handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException()); + } + /** * This method handles an incoming http request. * @@ -438,29 +448,56 @@ private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChan RestChannel innerChannel; ThreadContext threadContext = threadPool.getThreadContext(); try { - innerChannel = new DefaultRestChannel( - httpChannel, - httpRequest, - restRequest, - bigArrays, - handlingSettings, - threadContext, - corsHandler, - trace - ); + if (httpChannel instanceof StreamingHttpChannel) { + innerChannel = new DefaultStreamingRestChannel( + (StreamingHttpChannel) httpChannel, + httpRequest, + restRequest, + bigArrays, + handlingSettings, + threadContext, + corsHandler, + trace + ); + } else { + innerChannel = new DefaultRestChannel( + httpChannel, + httpRequest, + restRequest, + bigArrays, + handlingSettings, + threadContext, + corsHandler, + trace + ); + } } catch (final IllegalArgumentException e) { badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e); final RestRequest innerRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel); - innerChannel = new DefaultRestChannel( - httpChannel, - httpRequest, - innerRequest, - bigArrays, - handlingSettings, - threadContext, - corsHandler, - trace - ); + + if (httpChannel instanceof StreamingHttpChannel) { + innerChannel = new DefaultStreamingRestChannel( + (StreamingHttpChannel) httpChannel, + httpRequest, + innerRequest, + bigArrays, + handlingSettings, + threadContext, + corsHandler, + trace + ); + } else { + innerChannel = new DefaultRestChannel( + httpChannel, + httpRequest, + innerRequest, + bigArrays, + handlingSettings, + threadContext, + corsHandler, + trace + ); + } } channel = innerChannel; } diff --git a/server/src/main/java/org/opensearch/http/DefaultRestChannel.java b/server/src/main/java/org/opensearch/http/DefaultRestChannel.java index 7084600133a75..497ec799ff937 100644 --- a/server/src/main/java/org/opensearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/opensearch/http/DefaultRestChannel.java @@ -58,11 +58,11 @@ /** * The default rest channel for incoming requests. This class implements the basic logic for sending a rest - * response. It will set necessary headers nad ensure that bytes are released after the response is sent. + * response. It will set necessary headers and ensure that bytes are released after the response is sent. * * @opensearch.internal */ -public class DefaultRestChannel extends AbstractRestChannel implements RestChannel { +class DefaultRestChannel extends AbstractRestChannel implements RestChannel { static final String CLOSE = "close"; static final String CONNECTION = "connection"; diff --git a/server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java b/server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java new file mode 100644 index 0000000000000..3a5c699c1f978 --- /dev/null +++ b/server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.ReleasableBytesStreamOutput; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.StreamingRestChannel; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.reactivestreams.Subscriber; + +import static org.opensearch.tasks.Task.X_OPAQUE_ID; + +/** + * The streaming rest channel for incoming requests. This class implements the logic for sending a streaming + * rest in chunks response. It will set necessary headers and ensure that bytes are released after the full + * response is sent. + * + * @opensearch.internal + */ +class DefaultStreamingRestChannel extends DefaultRestChannel implements StreamingRestChannel { + private final StreamingHttpChannel streamingHttpChannel; + @Nullable + private final HttpTracer tracerLog; + + /** + * Wraps the instance of the {@link XContentBuilder} into {@link HttpChunk} + */ + private final static class XContentHttpChunk implements HttpChunk { + private final BytesReference content; + + private XContentHttpChunk(@Nullable final XContentBuilder builder) { + if (builder == null /* no content */) { + content = BytesArray.EMPTY; + } else { + content = BytesReference.bytes(builder); + } + } + + @Override + public boolean isLast() { + return content == BytesArray.EMPTY; + } + + @Override + public BytesReference content() { + return content; + } + + @Override + public void release() {} + } + + DefaultStreamingRestChannel( + StreamingHttpChannel streamingHttpChannel, + HttpRequest httpRequest, + RestRequest request, + BigArrays bigArrays, + HttpHandlingSettings settings, + ThreadContext threadContext, + CorsHandler corsHandler, + @Nullable HttpTracer tracerLog + ) { + super(streamingHttpChannel, httpRequest, request, bigArrays, settings, threadContext, corsHandler, tracerLog); + this.streamingHttpChannel = streamingHttpChannel; + this.tracerLog = tracerLog; + } + + @Override + public void subscribe(Subscriber subscriber) { + streamingHttpChannel.subscribe(subscriber); + } + + @Override + public void sendChunk(XContentBuilder builder) { + final HttpChunk chunk = new XContentHttpChunk(builder); + + String opaque = null; + boolean success = false; + final List toClose = new ArrayList<>(3); + String contentLength = null; + + try { + opaque = request.header(X_OPAQUE_ID); + contentLength = String.valueOf(chunk.content().length()); + + final BytesReference content = chunk.content(); + if (content instanceof Releasable) { + toClose.add((Releasable) content); + } + + BytesStreamOutput bytesStreamOutput = bytesOutputOrNull(); + if (bytesStreamOutput instanceof ReleasableBytesStreamOutput) { + toClose.add((Releasable) bytesStreamOutput); + } + + ActionListener listener = ActionListener.wrap(() -> Releasables.close(toClose)); + streamingHttpChannel.sendChunk(chunk, listener); + success = true; + } finally { + if (success == false) { + Releasables.close(toClose); + } + if (tracerLog != null) { + tracerLog.traceChunk(chunk, streamingHttpChannel, contentLength, opaque, request.getRequestId(), success); + } + } + } + + @Override + public void prepareResponse(RestStatus status, Map> headers) { + streamingHttpChannel.prepareResponse(status.getStatus(), headers); + } +} diff --git a/server/src/main/java/org/opensearch/http/HttpChunk.java b/server/src/main/java/org/opensearch/http/HttpChunk.java new file mode 100644 index 0000000000000..32438a21b799d --- /dev/null +++ b/server/src/main/java/org/opensearch/http/HttpChunk.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.bytes.BytesReference; + +/** + * Represents a chunk of the HTTP request / response stream + */ +@PublicApi(since = "2.15.0") +public interface HttpChunk { + /** + * Signals this is the last chunk of the stream. + * @return "true" if this is the last chunk of the stream, "false" otherwise + */ + boolean isLast(); + + /** + * Returns the content of this chunk + * @return the content of this chunk + */ + BytesReference content(); + + /** + * Releases all possible resources associated with this chunk + */ + void release(); +} diff --git a/server/src/main/java/org/opensearch/http/HttpServerTransport.java b/server/src/main/java/org/opensearch/http/HttpServerTransport.java index 012b69c29c1d4..f58d604151fd0 100644 --- a/server/src/main/java/org/opensearch/http/HttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/HttpServerTransport.java @@ -38,8 +38,12 @@ import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.service.ReportingService; import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestHandler; import org.opensearch.rest.RestRequest; +import java.util.Map; +import java.util.Optional; + /** * HTTP Transport server * @@ -61,6 +65,17 @@ public interface HttpServerTransport extends LifecycleComponent, ReportingServic * Dispatches HTTP requests. */ interface Dispatcher { + /** + * Finds the matching {@link RestHandler} that the request is going to be dispatched to, if any. + * @param uri request URI + * @param rawPath request raw path + * @param method request HTTP method + * @param params request parameters + * @return matching {@link RestHandler} that the request is going to be dispatched to, {@code Optional.empty()} if none match + */ + default Optional dispatchHandler(String uri, String rawPath, RestRequest.Method method, Map params) { + return Optional.empty(); + } /** * Dispatches the {@link RestRequest} to the relevant request handler or responds to the given rest channel directly if diff --git a/server/src/main/java/org/opensearch/http/HttpTracer.java b/server/src/main/java/org/opensearch/http/HttpTracer.java index 7a763b9ffb790..de1da4a20e294 100644 --- a/server/src/main/java/org/opensearch/http/HttpTracer.java +++ b/server/src/main/java/org/opensearch/http/HttpTracer.java @@ -128,6 +128,36 @@ void traceResponse( ); } + /** + * Logs the response chunk to a request that was logged by {@link #maybeTraceRequest(RestRequest, Exception)}. + * + * @param chunk response chunk + * @param httpChannel HttpChannel the response was sent on + * @param contentLength Value of the response content length header + * @param opaqueHeader Value of HTTP header {@link Task#X_OPAQUE_ID} + * @param requestId Request id as returned by {@link RestRequest#getRequestId()} + * @param success Whether the response was successfully sent + */ + void traceChunk( + HttpChunk chunk, + StreamingHttpChannel httpChannel, + String contentLength, + String opaqueHeader, + long requestId, + boolean success + ) { + logger.trace( + new ParameterizedMessage( + "[{}][{}][{}] sent next chunk to [{}] success [{}]", + requestId, + opaqueHeader, + contentLength, + httpChannel, + success + ) + ); + } + private void setTracerLogInclude(List tracerLogInclude) { this.tracerLogInclude = tracerLogInclude.toArray(Strings.EMPTY_ARRAY); } diff --git a/server/src/main/java/org/opensearch/http/StreamingHttpChannel.java b/server/src/main/java/org/opensearch/http/StreamingHttpChannel.java new file mode 100644 index 0000000000000..22743f423ad92 --- /dev/null +++ b/server/src/main/java/org/opensearch/http/StreamingHttpChannel.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.action.ActionListener; + +import java.util.List; +import java.util.Map; + +import org.reactivestreams.Publisher; + +/** + * Represents an HTTP communication channel with streaming capabilities. + * + * @opensearch.api + */ +@PublicApi(since = "2.15.0") +public interface StreamingHttpChannel extends HttpChannel, Publisher { + /** + * Sends the next {@link HttpChunk} to the response stream + * @param chunk response chunk to send to channel + */ + void sendChunk(HttpChunk chunk, ActionListener listener); + + /** + * Receives the next {@link HttpChunk} from the request stream + * @param chunk next {@link HttpChunk} + */ + void receiveChunk(HttpChunk chunk); + + /** + * Prepares response before kicking of content streaming + * @param status response status + * @param headers response headers + */ + void prepareResponse(int status, Map> headers); +} diff --git a/server/src/main/java/org/opensearch/rest/BaseRestHandler.java b/server/src/main/java/org/opensearch/rest/BaseRestHandler.java index fc150405747ec..e76962c31839d 100644 --- a/server/src/main/java/org/opensearch/rest/BaseRestHandler.java +++ b/server/src/main/java/org/opensearch/rest/BaseRestHandler.java @@ -317,6 +317,11 @@ public boolean allowsUnsafeBuffers() { public boolean allowSystemIndexAccessByDefault() { return delegate.allowSystemIndexAccessByDefault(); } + + @Override + public boolean supportsStreaming() { + return delegate.supportsStreaming(); + } } /** diff --git a/server/src/main/java/org/opensearch/rest/RestController.java b/server/src/main/java/org/opensearch/rest/RestController.java index 95abb9b3daeca..4487508e80a3a 100644 --- a/server/src/main/java/org/opensearch/rest/RestController.java +++ b/server/src/main/java/org/opensearch/rest/RestController.java @@ -54,6 +54,7 @@ import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.http.HttpChunk; import org.opensearch.http.HttpServerTransport; import org.opensearch.identity.IdentityService; import org.opensearch.identity.Subject; @@ -71,12 +72,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Mono; + import static org.opensearch.cluster.metadata.IndexNameExpressionResolver.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; @@ -257,6 +262,32 @@ public void registerHandler(final RestHandler restHandler) { ); } + @Override + public Optional dispatchHandler(String uri, String rawPath, RestRequest.Method method, Map params) { + // Loop through all possible handlers, attempting to dispatch the request + final Iterator allHandlers = getAllRestMethodHandlers(params, rawPath); + + while (allHandlers.hasNext()) { + final RestHandler handler; + final RestMethodHandlers handlers = allHandlers.next(); + if (handlers == null) { + handler = null; + } else { + handler = handlers.getHandler(method); + } + if (handler == null) { + final Set validMethodSet = getValidHandlerMethodSet(rawPath); + if (validMethodSet.contains(method) == false) { + return Optional.empty(); + } + } else { + return Optional.of(handler); + } + } + + return Optional.empty(); + } + @Override public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { try { @@ -318,8 +349,25 @@ private void dispatchRequest(RestRequest request, RestChannel channel, RestHandl } else { inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength); } - // iff we could reserve bytes for the request we need to send the response also over this channel - responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); + + if (handler.supportsStreaming()) { + // The handler may support streaming but not the engine, in this case we fail with the bad request + if (channel instanceof StreamingRestChannel) { + responseChannel = new StreamHandlingHttpChannel((StreamingRestChannel) channel, circuitBreakerService, contentLength); + } else { + throw new IllegalStateException( + "The engine does not support HTTP streaming, unable to serve uri [" + + request.getHttpRequest().uri() + + "] and method [" + + request.getHttpRequest().method() + + "]" + ); + } + } else { + // if we could reserve bytes for the request we need to send the response also over this channel + responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); + } + // TODO: Count requests double in the circuit breaker if they need copying? if (handler.allowsUnsafeBuffers() == false) { request.ensureSafeBuffers(); @@ -642,7 +690,86 @@ private void close() { } inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength); } + } + + private static final class StreamHandlingHttpChannel implements StreamingRestChannel { + private final StreamingRestChannel delegate; + private final CircuitBreakerService circuitBreakerService; + private final int contentLength; + private final AtomicBoolean closed = new AtomicBoolean(); + + StreamHandlingHttpChannel(StreamingRestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength) { + this.delegate = delegate; + this.circuitBreakerService = circuitBreakerService; + this.contentLength = contentLength; + } + + @Override + public XContentBuilder newBuilder() throws IOException { + return delegate.newBuilder(); + } + + @Override + public XContentBuilder newErrorBuilder() throws IOException { + return delegate.newErrorBuilder(); + } + + @Override + public XContentBuilder newBuilder(@Nullable MediaType mediaType, boolean useFiltering) throws IOException { + return delegate.newBuilder(mediaType, useFiltering); + } + + @Override + public XContentBuilder newBuilder(MediaType mediaType, MediaType responseContentType, boolean useFiltering) throws IOException { + return delegate.newBuilder(mediaType, responseContentType, useFiltering); + } + + @Override + public BytesStreamOutput bytesOutput() { + return delegate.bytesOutput(); + } + + @Override + public RestRequest request() { + return delegate.request(); + } + + @Override + public boolean detailedErrorsEnabled() { + return delegate.detailedErrorsEnabled(); + } + @Override + public void sendResponse(RestResponse response) { + close(); + // TODO: Check if subscribe() is already called, the headers and status are going to be sent + // over so we need to populate those **before** that, if possible. + prepareResponse(response.status(), Map.of("Content-Type", List.of(response.contentType()))); + Mono.ignoreElements(this).then(Mono.just(response)).subscribe(delegate::sendResponse); + } + + @Override + public void sendChunk(XContentBuilder builder) { + delegate.sendChunk(builder); + } + + @Override + public void prepareResponse(RestStatus status, Map> headers) { + delegate.prepareResponse(status, headers); + } + + @Override + public void subscribe(Subscriber subscriber) { + delegate.subscribe(subscriber); + } + + private void close() { + // attempt to close once atomically + if (closed.compareAndSet(false, true) == false) { + throw new IllegalStateException("Channel is already closed"); + } + inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength); + } } private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) { diff --git a/server/src/main/java/org/opensearch/rest/RestHandler.java b/server/src/main/java/org/opensearch/rest/RestHandler.java index 877afdd951088..1139e5fc65f31 100644 --- a/server/src/main/java/org/opensearch/rest/RestHandler.java +++ b/server/src/main/java/org/opensearch/rest/RestHandler.java @@ -72,6 +72,14 @@ default boolean supportsContentStream() { return false; } + /** + * Indicates if the RestHandler supports request / response streaming. Please note that the transport engine has to support + * streaming as well. + */ + default boolean supportsStreaming() { + return false; + } + /** * Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return * {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the diff --git a/server/src/main/java/org/opensearch/rest/StreamingRestChannel.java b/server/src/main/java/org/opensearch/rest/StreamingRestChannel.java new file mode 100644 index 0000000000000..60111331bb128 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/StreamingRestChannel.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest; + +import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.http.HttpChunk; + +import java.util.List; +import java.util.Map; + +import org.reactivestreams.Publisher; + +/** + * A streaming channel used to prepare response and sending the response in chunks. + * + * @opensearch.api + */ +@PublicApi(since = "2.15.0") +public interface StreamingRestChannel extends RestChannel, Publisher { + /** + * Sends the next {@link HttpChunk} to the response stream + * @param builder response chunk as an instance of the {@link XContentBuilder} to send to channel, the {@code null} value indicates no content. + */ + void sendChunk(@Nullable XContentBuilder builder); + + /** + * Prepares response before kicking of content streaming + * @param status response status + * @param headers response headers + */ + void prepareResponse(RestStatus status, Map> headers); +}