From 11cc4fa70bfdac0b1d34c289fd7c3d674ea0f4cc Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 11 Jun 2024 08:25:37 -0400 Subject: [PATCH] [Streaming Indexing] Enhance RestAction with request / response streaming support (#13772) (#14149) Signed-off-by: Andriy Redko (cherry picked from commit c71060e29f56918b87a78237ff117d9018456428) Signed-off-by: kkewwei --- CHANGELOG.md | 1 + .../client/RestHighLevelClient.java | 6 +- .../OpenSearchDashboardsPlugin.java | 2 + .../netty4/ReactorNetty4HttpChunk.java | 49 ++++ .../netty4/ReactorNetty4HttpRequest.java | 4 + .../ReactorNetty4HttpServerTransport.java | 56 +++-- ...ReactorNetty4NonStreamingHttpChannel.java} | 4 +- ...torNetty4NonStreamingRequestConsumer.java} | 8 +- .../ReactorNetty4StreamingHttpChannel.java | 132 +++++++++++ ...ReactorNetty4StreamingRequestConsumer.java | 53 +++++ ...eactorNetty4StreamingResponseProducer.java | 54 +++++ .../netty4/StreamingHttpContentSender.java | 32 +++ .../reactor/netty4/ReactorHttpClient.java | 52 +++++ ...tty4HttpServerTransportStreamingTests.java | 211 ++++++++++++++++++ .../org/opensearch/action/ActionModule.java | 2 + .../xcontent/support/XContentHttpChunk.java | 65 ++++++ .../http/AbstractHttpServerTransport.java | 77 +++++-- .../opensearch/http/DefaultRestChannel.java | 4 +- .../http/DefaultStreamingRestChannel.java | 107 +++++++++ .../java/org/opensearch/http/HttpChunk.java | 33 +++ .../opensearch/http/HttpServerTransport.java | 15 ++ .../java/org/opensearch/http/HttpTracer.java | 30 +++ .../opensearch/http/StreamingHttpChannel.java | 56 +++++ .../org/opensearch/rest/BaseRestHandler.java | 16 ++ .../org/opensearch/rest/RestController.java | 146 +++++++++++- .../java/org/opensearch/rest/RestHandler.java | 8 + .../opensearch/rest/StreamingRestChannel.java | 51 +++++ .../document/RestBulkStreamingAction.java | 199 +++++++++++++++++ .../RestBulkStreamingActionTests.java | 89 ++++++++ 29 files changed, 1513 insertions(+), 49 deletions(-) create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java rename plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/{NonStreamingHttpChannel.java => ReactorNetty4NonStreamingHttpChannel.java} (92%) rename plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/{NonStreamingRequestConsumer.java => ReactorNetty4NonStreamingRequestConsumer.java} (89%) create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/StreamingHttpContentSender.java create mode 100644 plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java create mode 100644 server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java create mode 100644 server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java create mode 100644 server/src/main/java/org/opensearch/http/HttpChunk.java create mode 100644 server/src/main/java/org/opensearch/http/StreamingHttpChannel.java create mode 100644 server/src/main/java/org/opensearch/rest/StreamingRestChannel.java create mode 100644 server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java create mode 100644 server/src/test/java/org/opensearch/rest/action/document/RestBulkStreamingActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e3505513d8453..a7a93615c7d18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add recovery chunk size setting ([#13997](https://github.com/opensearch-project/OpenSearch/pull/13997)) - [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982)) - [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374)) +- [Streaming Indexing] Enhance RestAction with request / response streaming support ([#13772](https://github.com/opensearch-project/OpenSearch/pull/13772)) - Move Remote Store Migration from DocRep to GA and modify remote migration settings name ([#14100](https://github.com/opensearch-project/OpenSearch/pull/14100)) - [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13995](https://github.com/opensearch-project/OpenSearch/pull/13995)) - Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304)) diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java index 94303097c772d..d587f76e61b49 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java @@ -2227,11 +2227,11 @@ protected final Resp parseEntity(final HttpEntity entity, final CheckedFu if (entity.getContentType() == null) { throw new IllegalStateException("OpenSearch didn't return the [Content-Type] header, unable to parse response body"); } - MediaType medaiType = MediaType.fromMediaType(entity.getContentType().getValue()); - if (medaiType == null) { + MediaType mediaType = MediaType.fromMediaType(entity.getContentType().getValue()); + if (mediaType == null) { throw new IllegalStateException("Unsupported Content-Type: " + entity.getContentType().getValue()); } - try (XContentParser parser = medaiType.xContent().createParser(registry, DEPRECATION_HANDLER, entity.getContent())) { + try (XContentParser parser = mediaType.xContent().createParser(registry, DEPRECATION_HANDLER, entity.getContent())) { return entityParser.apply(parser); } } diff --git a/modules/opensearch-dashboards/src/main/java/org/opensearch/dashboards/OpenSearchDashboardsPlugin.java b/modules/opensearch-dashboards/src/main/java/org/opensearch/dashboards/OpenSearchDashboardsPlugin.java index 534218ef438e7..5eab33f206ff4 100644 --- a/modules/opensearch-dashboards/src/main/java/org/opensearch/dashboards/OpenSearchDashboardsPlugin.java +++ b/modules/opensearch-dashboards/src/main/java/org/opensearch/dashboards/OpenSearchDashboardsPlugin.java @@ -54,6 +54,7 @@ import org.opensearch.rest.action.admin.indices.RestRefreshAction; import org.opensearch.rest.action.admin.indices.RestUpdateSettingsAction; import org.opensearch.rest.action.document.RestBulkAction; +import org.opensearch.rest.action.document.RestBulkStreamingAction; import org.opensearch.rest.action.document.RestDeleteAction; import org.opensearch.rest.action.document.RestGetAction; import org.opensearch.rest.action.document.RestIndexAction; @@ -127,6 +128,7 @@ public List getRestHandlers( new OpenSearchDashboardsWrappedRestHandler(new RestMultiGetAction(settings)), new OpenSearchDashboardsWrappedRestHandler(new RestSearchAction()), new OpenSearchDashboardsWrappedRestHandler(new RestBulkAction(settings)), + new OpenSearchDashboardsWrappedRestHandler(new RestBulkStreamingAction(settings)), new OpenSearchDashboardsWrappedRestHandler(new RestDeleteAction()), new OpenSearchDashboardsWrappedRestHandler(new RestDeleteByQueryAction()), diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java new file mode 100644 index 0000000000000..3b4a308691e7b --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.http.HttpChunk; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.buffer.ByteBuf; + +class ReactorNetty4HttpChunk implements HttpChunk { + private final AtomicBoolean released; + private final boolean pooled; + private final ByteBuf content; + private final boolean last; + + ReactorNetty4HttpChunk(ByteBuf content, boolean last) { + this.content = content; + this.pooled = true; + this.released = new AtomicBoolean(false); + this.last = last; + } + + @Override + public BytesReference content() { + assert released.get() == false; + return Netty4Utils.toBytesReference(content); + } + + @Override + public void close() { + if (pooled && released.compareAndSet(false, true)) { + content.release(); + } + } + + @Override + public boolean isLast() { + return last; + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java index 4406c555a5b04..491c7aa885103 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java @@ -44,6 +44,10 @@ class ReactorNetty4HttpRequest implements HttpRequest { private final Exception inboundException; private final boolean pooled; + ReactorNetty4HttpRequest(HttpServerRequest request) { + this(request, new HttpHeadersMap(request.requestHeaders()), new AtomicBoolean(false), false, Unpooled.EMPTY_BUFFER); + } + ReactorNetty4HttpRequest(HttpServerRequest request, ByteBuf content) { this(request, new HttpHeadersMap(request.requestHeaders()), new AtomicBoolean(false), true, content); } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java index bd1646d753016..906bbfd072da8 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java @@ -26,6 +26,8 @@ import org.opensearch.http.HttpServerChannel; import org.opensearch.http.reactor.netty4.ssl.SslUtils; import org.opensearch.plugins.SecureHttpTransportSettingsProvider; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest.Method; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.reactor.SharedGroupFactory; @@ -40,6 +42,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.Optional; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelOption; @@ -351,24 +354,45 @@ public List protocols() { * @return response publisher */ protected Publisher incomingRequest(HttpServerRequest request, HttpServerResponse response) { - final NonStreamingRequestConsumer consumer = new NonStreamingRequestConsumer<>( - this, - request, - response, - maxCompositeBufferComponents + final Method method = HttpConversionUtil.convertMethod(request.method()); + final Optional dispatchHandlerOpt = dispatcher.dispatchHandler( + request.uri(), + request.fullPath(), + method, + request.params() ); + if (dispatchHandlerOpt.map(RestHandler::supportsStreaming).orElse(false)) { + final ReactorNetty4StreamingRequestConsumer consumer = new ReactorNetty4StreamingRequestConsumer<>( + request, + response + ); + + request.receiveContent() + .switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)) + .subscribe(consumer, error -> {}, () -> consumer.accept(DefaultLastHttpContent.EMPTY_LAST_CONTENT)); + + incomingStream(new ReactorNetty4HttpRequest(request), consumer.httpChannel()); + return response.sendObject(consumer); + } else { + final ReactorNetty4NonStreamingRequestConsumer consumer = new ReactorNetty4NonStreamingRequestConsumer<>( + this, + request, + response, + maxCompositeBufferComponents + ); - request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer); - - return Mono.from(consumer).flatMap(hc -> { - final FullHttpResponse r = (FullHttpResponse) hc; - response.status(r.status()); - response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue()))); - response.chunkedTransfer(false); - response.compression(true); - r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue())); - return Mono.from(response.sendObject(r.content())); - }); + request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer); + + return Mono.from(consumer).flatMap(hc -> { + final FullHttpResponse r = (FullHttpResponse) hc; + response.status(r.status()); + response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue()))); + response.chunkedTransfer(false); + response.compression(true); + r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue())); + return Mono.from(response.sendObject(r.content())); + }); + } } /** diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java similarity index 92% rename from plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java rename to plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java index 98b359319ff1b..7df0b3c0c35fe 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java @@ -23,13 +23,13 @@ import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; -class NonStreamingHttpChannel implements HttpChannel { +class ReactorNetty4NonStreamingHttpChannel implements HttpChannel { private final HttpServerRequest request; private final HttpServerResponse response; private final CompletableContext closeContext = new CompletableContext<>(); private final FluxSink emitter; - NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink emitter) { + ReactorNetty4NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink emitter) { this.request = request; this.response = response; this.emitter = emitter; diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java similarity index 89% rename from plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java rename to plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java index d43e23e800e65..c09e7755b1670 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java @@ -25,7 +25,7 @@ import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; -class NonStreamingRequestConsumer implements Consumer, Publisher, Disposable { +class ReactorNetty4NonStreamingRequestConsumer implements Consumer, Publisher, Disposable { private final HttpServerRequest request; private final HttpServerResponse response; private final CompositeByteBuf content; @@ -34,7 +34,7 @@ class NonStreamingRequestConsumer implements Consumer, private final AtomicBoolean disposed = new AtomicBoolean(false); private volatile FluxSink emitter; - NonStreamingRequestConsumer( + ReactorNetty4NonStreamingRequestConsumer( AbstractHttpServerTransport transport, HttpServerRequest request, HttpServerResponse response, @@ -64,12 +64,12 @@ public void accept(T message) { } } - public void process(HttpContent in, FluxSink emitter) { + void process(HttpContent in, FluxSink emitter) { // Consume request body in full before dispatching it content.addComponent(true, in.content().retain()); if (in instanceof LastHttpContent) { - final NonStreamingHttpChannel channel = new NonStreamingHttpChannel(request, response, emitter); + final ReactorNetty4NonStreamingHttpChannel channel = new ReactorNetty4NonStreamingHttpChannel(request, response, emitter); final HttpRequest r = createRequest(request, content); try { diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java new file mode 100644 index 0000000000000..56dadea0477c5 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java @@ -0,0 +1,132 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.common.concurrent.CompletableContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.http.HttpChunk; +import org.opensearch.http.HttpResponse; +import org.opensearch.http.StreamingHttpChannel; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +class ReactorNetty4StreamingHttpChannel implements StreamingHttpChannel { + private final HttpServerRequest request; + private final HttpServerResponse response; + private final CompletableContext closeContext = new CompletableContext<>(); + private final Publisher receiver; + private final StreamingHttpContentSender sender; + private volatile FluxSink producer; + private volatile boolean lastChunkReceived = false; + + ReactorNetty4StreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, StreamingHttpContentSender sender) { + this.request = request; + this.response = response; + this.sender = sender; + this.receiver = Flux.create(producer -> this.producer = producer); + this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext)); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() { + request.withConnection(connection -> connection.channel().close()); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public void sendChunk(HttpChunk chunk, ActionListener listener) { + sender.send(createContent(chunk), listener, chunk.isLast()); + } + + @Override + public void sendResponse(HttpResponse response, ActionListener listener) { + sender.send(createContent(response), listener, true); + } + + @Override + public void prepareResponse(int status, Map> headers) { + this.response.status(status); + headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v))); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return (InetSocketAddress) response.remoteAddress(); + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) response.hostAddress(); + } + + @Override + public void receiveChunk(HttpChunk message) { + try { + if (lastChunkReceived) { + return; + } + + producer.next(message); + if (message.isLast()) { + lastChunkReceived = true; + producer.complete(); + } + } finally { + message.close(); + } + } + + @Override + public boolean isReadable() { + return producer != null; + } + + @Override + public boolean isWritable() { + return sender.isReady(); + } + + @Override + public void subscribe(Subscriber subscriber) { + receiver.subscribe(subscriber); + } + + private static HttpContent createContent(HttpResponse response) { + final FullHttpResponse fullHttpResponse = (FullHttpResponse) response; + return new DefaultHttpContent(fullHttpResponse.content()); + } + + private static HttpContent createContent(HttpChunk chunk) { + return new DefaultHttpContent(Unpooled.copiedBuffer(BytesReference.toByteBuffers(chunk.content()))); + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java new file mode 100644 index 0000000000000..f34f54e561021 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.http.HttpChunk; +import org.opensearch.http.StreamingHttpChannel; + +import java.util.function.Consumer; + +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.LastHttpContent; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +class ReactorNetty4StreamingRequestConsumer implements Consumer, Publisher { + private final ReactorNetty4StreamingResponseProducer sender; + private final StreamingHttpChannel httpChannel; + + ReactorNetty4StreamingRequestConsumer(HttpServerRequest request, HttpServerResponse response) { + this.sender = new ReactorNetty4StreamingResponseProducer(); + this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender); + } + + @Override + public void accept(T message) { + if (message instanceof LastHttpContent) { + httpChannel.receiveChunk(createChunk(message, true)); + } else if (message instanceof HttpContent) { + httpChannel.receiveChunk(createChunk(message, false)); + } + } + + @Override + public void subscribe(Subscriber s) { + sender.subscribe(s); + } + + HttpChunk createChunk(HttpContent chunk, boolean last) { + return new ReactorNetty4HttpChunk(chunk.content().retain(), last); + } + + StreamingHttpChannel httpChannel() { + return httpChannel; + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java new file mode 100644 index 0000000000000..616edccdfc396 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.core.action.ActionListener; + +import io.netty.handler.codec.http.HttpContent; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +class ReactorNetty4StreamingResponseProducer implements StreamingHttpContentSender, Publisher { + private final Publisher sender; + private volatile FluxSink emitter; + + ReactorNetty4StreamingResponseProducer() { + this.sender = Flux.create(emitter -> this.emitter = emitter); + } + + @Override + public void send(HttpContent content, ActionListener listener, boolean isLast) { + try { + emitter.next(content); + listener.onResponse(null); + if (isLast) { + emitter.complete(); + } + } catch (final Exception ex) { + emitter.error(ex); + listener.onFailure(ex); + } + } + + @Override + public void subscribe(Subscriber s) { + sender.subscribe(s); + } + + @Override + public boolean isReady() { + return emitter != null; + } + + FluxSink emitter() { + return emitter; + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/StreamingHttpContentSender.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/StreamingHttpContentSender.java new file mode 100644 index 0000000000000..f07d6fbb88349 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/StreamingHttpContentSender.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.core.action.ActionListener; + +import io.netty.handler.codec.http.HttpContent; + +/** + * The generic interface for chunked {@link HttpContent} producers (response streaming). + */ +interface StreamingHttpContentSender { + /** + * Sends the next {@link HttpContent} over the wire + * @param content next {@link HttpContent} + * @param listener action listener + * @param isLast {@code true} if this is the last chunk, {@code false} otherwise + */ + void send(HttpContent content, ActionListener listener, boolean isLast); + + /** + * Returns {@code true} is this channel is ready for streaming response data, {@code false} otherwise + * @return {@code true} is this channel is ready for streaming response data, {@code false} otherwise + */ + boolean isReady(); +} diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java index 920c895205023..0953e51484bd3 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java @@ -14,16 +14,22 @@ package org.opensearch.http.reactor.netty4; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchTestCase; import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.stream.Stream; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -36,6 +42,7 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http2.HttpConversionUtil; @@ -121,6 +128,11 @@ public final FullHttpResponse send(InetSocketAddress remoteAddress, FullHttpRequ return responses.get(0); } + public final FullHttpResponse stream(InetSocketAddress remoteAddress, HttpRequest httpRequest, Stream stream) + throws InterruptedException { + return sendRequestStream(remoteAddress, httpRequest, stream); + } + public final FullHttpResponse send(InetSocketAddress remoteAddress, FullHttpRequest httpRequest, HttpContent content) throws InterruptedException { final List responses = sendRequests( @@ -207,6 +219,46 @@ private List sendRequests( } } + private FullHttpResponse sendRequestStream( + final InetSocketAddress remoteAddress, + final HttpRequest request, + final Stream stream + ) { + final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); + try { + final HttpClient client = createClient(remoteAddress, eventLoopGroup); + + return client.headers(h -> h.add(request.headers())) + .baseUrl(request.getUri()) + .request(request.method()) + .send(Flux.fromStream(stream).map(s -> { + try (XContentBuilder builder = XContentType.JSON.contentBuilder()) { + return Unpooled.wrappedBuffer( + s.toXContent(builder, ToXContent.EMPTY_PARAMS).toString().getBytes(StandardCharsets.UTF_8) + ); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + })) + .response( + (r, c) -> c.aggregate() + .map( + b -> new DefaultFullHttpResponse( + r.version(), + r.status(), + b.retain(), + r.responseHeaders(), + EmptyHttpHeaders.INSTANCE + ) + ) + ) + .blockLast(); + + } finally { + eventLoopGroup.shutdownGracefully().awaitUninterruptibly(); + } + } + private HttpClient createClient(final InetSocketAddress remoteAddress, final NioEventLoopGroup eventLoopGroup) { final HttpClient client = HttpClient.newConnection() .resolver(DefaultAddressResolverGroup.INSTANCE) diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java new file mode 100644 index 0000000000000..a7bf71e58e9b6 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java @@ -0,0 +1,211 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.MockBigArrays; +import org.opensearch.common.util.MockPageCacheRecycler; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.support.XContentHttpChunk; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestRequest.Method; +import org.opensearch.rest.StreamingRestChannel; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.reactor.SharedGroupFactory; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for the {@link ReactorNetty4HttpServerTransport} class with streaming support. + */ +public class ReactorNetty4HttpServerTransportStreamingTests extends OpenSearchTestCase { + private static final Function XCONTENT_CONVERTER = (str) -> new ToXContent() { + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + return builder.startObject().field("doc", str).endObject(); + } + }; + + private NetworkService networkService; + private ThreadPool threadPool; + private MockBigArrays bigArrays; + private ClusterSettings clusterSettings; + + @Before + public void setup() throws Exception { + networkService = new NetworkService(Collections.emptyList()); + threadPool = new TestThreadPool("test"); + bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + } + + @After + public void shutdown() throws Exception { + if (threadPool != null) { + threadPool.shutdownNow(); + } + threadPool = null; + networkService = null; + bigArrays = null; + clusterSettings = null; + } + + public void testRequestResponseStreaming() throws InterruptedException { + final String responseString = randomAlphaOfLength(4 * 1024); + final String url = "/stream/"; + + final ToXContent[] chunks = newChunks(responseString); + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + @Override + public Optional dispatchHandler(String uri, String rawPath, Method method, Map params) { + return Optional.of(new RestHandler() { + @Override + public boolean supportsStreaming() { + return true; + } + + @Override + public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { + logger.error("--> Unexpected request [{}]", request.uri()); + throw new AssertionError(); + } + }); + } + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + if (url.equals(request.uri())) { + assertThat(channel, instanceOf(StreamingRestChannel.class)); + final StreamingRestChannel streamingChannel = (StreamingRestChannel) channel; + + // Await at most 5 seconds till channel is ready for writing the response stream, fail otherwise + final Mono ready = Mono.fromRunnable(() -> { + while (!streamingChannel.isWritable()) { + Thread.onSpinWait(); + } + }).timeout(Duration.ofSeconds(5)); + + threadPool.executor(ThreadPool.Names.WRITE) + .execute(() -> Flux.concat(Flux.fromArray(newChunks(responseString)).map(e -> { + try (XContentBuilder builder = channel.newBuilder(XContentType.JSON, true)) { + return XContentHttpChunk.from(e.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + }), Mono.just(XContentHttpChunk.last())) + .delaySubscription(ready) + .subscribe(streamingChannel::sendChunk, null, () -> { + if (channel.bytesOutput() instanceof Releasable) { + ((Releasable) channel.bytesOutput()).close(); + } + })); + } else { + logger.error("--> Unexpected successful uri [{}]", request.uri()); + throw new AssertionError(); + } + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(); + } + + }; + + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + Settings.EMPTY, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(Settings.EMPTY), + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (ReactorHttpClient client = ReactorHttpClient.create(false)) { + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + final FullHttpResponse response = client.stream(remoteAddress.address(), request, Arrays.stream(chunks)); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + byte[] bytes = new byte[response.content().readableBytes()]; + response.content().readBytes(bytes); + assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(Arrays.stream(newChunks(responseString)).map(s -> { + try (XContentBuilder builder = XContentType.JSON.contentBuilder()) { + return s.toXContent(builder, ToXContent.EMPTY_PARAMS).toString(); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + }).collect(Collectors.joining("")))); + } finally { + response.release(); + } + } + } + } + + private static ToXContent[] newChunks(final String responseString) { + final ToXContent[] chunks = new ToXContent[responseString.length() / 16]; + + for (int chunk = 0; chunk < responseString.length(); chunk += 16) { + chunks[chunk / 16] = XCONTENT_CONVERTER.apply(responseString.substring(chunk, chunk + 16)); + } + + return chunks; + } +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index e771399607c5b..e353741ab86f9 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -434,6 +434,7 @@ import org.opensearch.rest.action.cat.RestTemplatesAction; import org.opensearch.rest.action.cat.RestThreadPoolAction; import org.opensearch.rest.action.document.RestBulkAction; +import org.opensearch.rest.action.document.RestBulkStreamingAction; import org.opensearch.rest.action.document.RestDeleteAction; import org.opensearch.rest.action.document.RestGetAction; import org.opensearch.rest.action.document.RestGetSourceAction; @@ -872,6 +873,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestTermVectorsAction()); registerHandler.accept(new RestMultiTermVectorsAction()); registerHandler.accept(new RestBulkAction(settings)); + registerHandler.accept(new RestBulkStreamingAction(settings)); registerHandler.accept(new RestUpdateAction()); registerHandler.accept(new RestSearchAction()); diff --git a/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java b/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java new file mode 100644 index 0000000000000..15b63a0ac2030 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.xcontent.support; + +import org.opensearch.common.Nullable; +import org.opensearch.common.lease.Releasable; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.http.HttpChunk; + +/** + * Wraps the instance of the {@link XContentBuilder} into {@link HttpChunk} + */ +public final class XContentHttpChunk implements HttpChunk { + private final BytesReference content; + + /** + * Creates a new {@link HttpChunk} from {@link XContentBuilder} + * @param builder {@link XContentBuilder} instance + * @return new {@link HttpChunk} instance, if passed {@link XContentBuilder} us {@code null}, a last empty {@link HttpChunk} will be returned + */ + public static HttpChunk from(@Nullable final XContentBuilder builder) { + return new XContentHttpChunk(builder); + } + + /** + * Creates a new last empty {@link HttpChunk} + * @return last empty {@link HttpChunk} instance + */ + public static HttpChunk last() { + return new XContentHttpChunk(null); + } + + private XContentHttpChunk(@Nullable final XContentBuilder builder) { + if (builder == null /* no content */) { + content = BytesArray.EMPTY; + } else { + content = BytesReference.bytes(builder); + } + } + + @Override + public boolean isLast() { + return content == BytesArray.EMPTY; + } + + @Override + public BytesReference content() { + return content; + } + + @Override + public void close() { + if (content instanceof Releasable) { + ((Releasable) content).close(); + } + } +} diff --git a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java index 257aca2b67990..991fbf12072be 100644 --- a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java @@ -357,6 +357,16 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) { logger.trace(() -> new ParameterizedMessage("Http channel accepted: {}", httpChannel)); } + /** + * This method handles an incoming http request as a stream. + * + * @param httpRequest that is incoming + * @param httpChannel that received the http request + */ + public void incomingStream(HttpRequest httpRequest, final StreamingHttpChannel httpChannel) { + handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException()); + } + /** * This method handles an incoming http request. * @@ -438,29 +448,56 @@ private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChan RestChannel innerChannel; ThreadContext threadContext = threadPool.getThreadContext(); try { - innerChannel = new DefaultRestChannel( - httpChannel, - httpRequest, - restRequest, - bigArrays, - handlingSettings, - threadContext, - corsHandler, - trace - ); + if (httpChannel instanceof StreamingHttpChannel) { + innerChannel = new DefaultStreamingRestChannel( + (StreamingHttpChannel) httpChannel, + httpRequest, + restRequest, + bigArrays, + handlingSettings, + threadContext, + corsHandler, + trace + ); + } else { + innerChannel = new DefaultRestChannel( + httpChannel, + httpRequest, + restRequest, + bigArrays, + handlingSettings, + threadContext, + corsHandler, + trace + ); + } } catch (final IllegalArgumentException e) { badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e); final RestRequest innerRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel); - innerChannel = new DefaultRestChannel( - httpChannel, - httpRequest, - innerRequest, - bigArrays, - handlingSettings, - threadContext, - corsHandler, - trace - ); + + if (httpChannel instanceof StreamingHttpChannel) { + innerChannel = new DefaultStreamingRestChannel( + (StreamingHttpChannel) httpChannel, + httpRequest, + innerRequest, + bigArrays, + handlingSettings, + threadContext, + corsHandler, + trace + ); + } else { + innerChannel = new DefaultRestChannel( + httpChannel, + httpRequest, + innerRequest, + bigArrays, + handlingSettings, + threadContext, + corsHandler, + trace + ); + } } channel = innerChannel; } diff --git a/server/src/main/java/org/opensearch/http/DefaultRestChannel.java b/server/src/main/java/org/opensearch/http/DefaultRestChannel.java index ef93914b6677c..7e85525bc4d1f 100644 --- a/server/src/main/java/org/opensearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/opensearch/http/DefaultRestChannel.java @@ -57,11 +57,11 @@ /** * The default rest channel for incoming requests. This class implements the basic logic for sending a rest - * response. It will set necessary headers nad ensure that bytes are released after the response is sent. + * response. It will set necessary headers and ensure that bytes are released after the response is sent. * * @opensearch.internal */ -public class DefaultRestChannel extends AbstractRestChannel implements RestChannel { +class DefaultRestChannel extends AbstractRestChannel implements RestChannel { static final String CLOSE = "close"; static final String CONNECTION = "connection"; diff --git a/server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java b/server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java new file mode 100644 index 0000000000000..7d8445294a4f3 --- /dev/null +++ b/server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.ReleasableBytesStreamOutput; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.StreamingRestChannel; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.reactivestreams.Subscriber; + +import static org.opensearch.tasks.Task.X_OPAQUE_ID; + +/** + * The streaming rest channel for incoming requests. This class implements the logic for sending a streaming + * rest in chunks response. It will set necessary headers and ensure that bytes are released after the full + * response is sent. + * + * @opensearch.internal + */ +class DefaultStreamingRestChannel extends DefaultRestChannel implements StreamingRestChannel { + private final StreamingHttpChannel streamingHttpChannel; + @Nullable + private final HttpTracer tracerLog; + + DefaultStreamingRestChannel( + StreamingHttpChannel streamingHttpChannel, + HttpRequest httpRequest, + RestRequest request, + BigArrays bigArrays, + HttpHandlingSettings settings, + ThreadContext threadContext, + CorsHandler corsHandler, + @Nullable HttpTracer tracerLog + ) { + super(streamingHttpChannel, httpRequest, request, bigArrays, settings, threadContext, corsHandler, tracerLog); + this.streamingHttpChannel = streamingHttpChannel; + this.tracerLog = tracerLog; + } + + @Override + public void subscribe(Subscriber subscriber) { + streamingHttpChannel.subscribe(subscriber); + } + + @Override + public void sendChunk(HttpChunk chunk) { + String opaque = null; + boolean success = false; + final List toClose = new ArrayList<>(3); + String contentLength = null; + + try { + opaque = request.header(X_OPAQUE_ID); + contentLength = String.valueOf(chunk.content().length()); + toClose.add(chunk); + + BytesStreamOutput bytesStreamOutput = newBytesOutput(); + if (bytesStreamOutput instanceof ReleasableBytesStreamOutput) { + toClose.add((Releasable) bytesStreamOutput); + } + + ActionListener listener = ActionListener.wrap(() -> Releasables.close(toClose)); + streamingHttpChannel.sendChunk(chunk, listener); + success = true; + } finally { + if (success == false) { + Releasables.close(toClose); + } + if (tracerLog != null) { + tracerLog.traceChunk(chunk, streamingHttpChannel, contentLength, opaque, request.getRequestId(), success); + } + } + } + + @Override + public void prepareResponse(RestStatus status, Map> headers) { + streamingHttpChannel.prepareResponse(status.getStatus(), headers); + } + + @Override + public boolean isReadable() { + return streamingHttpChannel.isReadable(); + } + + @Override + public boolean isWritable() { + return streamingHttpChannel.isWritable(); + } +} diff --git a/server/src/main/java/org/opensearch/http/HttpChunk.java b/server/src/main/java/org/opensearch/http/HttpChunk.java new file mode 100644 index 0000000000000..7bcb526fe17bb --- /dev/null +++ b/server/src/main/java/org/opensearch/http/HttpChunk.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lease.Releasable; +import org.opensearch.core.common.bytes.BytesReference; + +/** + * Represents a chunk of the HTTP request / response stream + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface HttpChunk extends Releasable { + /** + * Signals this is the last chunk of the stream. + * @return "true" if this is the last chunk of the stream, "false" otherwise + */ + boolean isLast(); + + /** + * Returns the content of this chunk + * @return the content of this chunk + */ + BytesReference content(); +} diff --git a/server/src/main/java/org/opensearch/http/HttpServerTransport.java b/server/src/main/java/org/opensearch/http/HttpServerTransport.java index 012b69c29c1d4..f58d604151fd0 100644 --- a/server/src/main/java/org/opensearch/http/HttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/HttpServerTransport.java @@ -38,8 +38,12 @@ import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.service.ReportingService; import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestHandler; import org.opensearch.rest.RestRequest; +import java.util.Map; +import java.util.Optional; + /** * HTTP Transport server * @@ -61,6 +65,17 @@ public interface HttpServerTransport extends LifecycleComponent, ReportingServic * Dispatches HTTP requests. */ interface Dispatcher { + /** + * Finds the matching {@link RestHandler} that the request is going to be dispatched to, if any. + * @param uri request URI + * @param rawPath request raw path + * @param method request HTTP method + * @param params request parameters + * @return matching {@link RestHandler} that the request is going to be dispatched to, {@code Optional.empty()} if none match + */ + default Optional dispatchHandler(String uri, String rawPath, RestRequest.Method method, Map params) { + return Optional.empty(); + } /** * Dispatches the {@link RestRequest} to the relevant request handler or responds to the given rest channel directly if diff --git a/server/src/main/java/org/opensearch/http/HttpTracer.java b/server/src/main/java/org/opensearch/http/HttpTracer.java index 7a763b9ffb790..de1da4a20e294 100644 --- a/server/src/main/java/org/opensearch/http/HttpTracer.java +++ b/server/src/main/java/org/opensearch/http/HttpTracer.java @@ -128,6 +128,36 @@ void traceResponse( ); } + /** + * Logs the response chunk to a request that was logged by {@link #maybeTraceRequest(RestRequest, Exception)}. + * + * @param chunk response chunk + * @param httpChannel HttpChannel the response was sent on + * @param contentLength Value of the response content length header + * @param opaqueHeader Value of HTTP header {@link Task#X_OPAQUE_ID} + * @param requestId Request id as returned by {@link RestRequest#getRequestId()} + * @param success Whether the response was successfully sent + */ + void traceChunk( + HttpChunk chunk, + StreamingHttpChannel httpChannel, + String contentLength, + String opaqueHeader, + long requestId, + boolean success + ) { + logger.trace( + new ParameterizedMessage( + "[{}][{}][{}] sent next chunk to [{}] success [{}]", + requestId, + opaqueHeader, + contentLength, + httpChannel, + success + ) + ); + } + private void setTracerLogInclude(List tracerLogInclude) { this.tracerLogInclude = tracerLogInclude.toArray(Strings.EMPTY_ARRAY); } diff --git a/server/src/main/java/org/opensearch/http/StreamingHttpChannel.java b/server/src/main/java/org/opensearch/http/StreamingHttpChannel.java new file mode 100644 index 0000000000000..9bab25cb537ed --- /dev/null +++ b/server/src/main/java/org/opensearch/http/StreamingHttpChannel.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.action.ActionListener; + +import java.util.List; +import java.util.Map; + +import org.reactivestreams.Publisher; + +/** + * Represents an HTTP communication channel with streaming capabilities. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface StreamingHttpChannel extends HttpChannel, Publisher { + /** + * Sends the next {@link HttpChunk} to the response stream + * @param chunk response chunk to send to channel + */ + void sendChunk(HttpChunk chunk, ActionListener listener); + + /** + * Receives the next {@link HttpChunk} from the request stream + * @param chunk next {@link HttpChunk} + */ + void receiveChunk(HttpChunk chunk); + + /** + * Prepares response before kicking of content streaming + * @param status response status + * @param headers response headers + */ + void prepareResponse(int status, Map> headers); + + /** + * Returns {@code true} is this channel is ready for streaming request data, {@code false} otherwise + * @return {@code true} is this channel is ready for streaming request data, {@code false} otherwise + */ + boolean isReadable(); + + /** + * Returns {@code true} is this channel is ready for streaming response data, {@code false} otherwise + * @return {@code true} is this channel is ready for streaming response data, {@code false} otherwise + */ + boolean isWritable(); +} diff --git a/server/src/main/java/org/opensearch/rest/BaseRestHandler.java b/server/src/main/java/org/opensearch/rest/BaseRestHandler.java index e3272dec322d3..520061e3fb159 100644 --- a/server/src/main/java/org/opensearch/rest/BaseRestHandler.java +++ b/server/src/main/java/org/opensearch/rest/BaseRestHandler.java @@ -40,6 +40,7 @@ import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; import org.opensearch.client.node.NodeClient; import org.opensearch.common.CheckedConsumer; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Setting; @@ -199,6 +200,16 @@ protected final String unrecognized( @PublicApi(since = "1.0.0") protected interface RestChannelConsumer extends CheckedConsumer {} + /** + * Streaming REST requests are handled by preparing a streaming channel consumer that represents the execution of + * the request against a channel. + * + * @opensearch.experimental + */ + @FunctionalInterface + @ExperimentalApi + protected interface StreamingRestChannelConsumer extends CheckedConsumer {} + /** * Prepare the request for execution. Implementations should consume all request params before * returning the runnable for actual execution. Unconsumed params will immediately terminate @@ -308,6 +319,11 @@ public boolean allowsUnsafeBuffers() { public boolean allowSystemIndexAccessByDefault() { return delegate.allowSystemIndexAccessByDefault(); } + + @Override + public boolean supportsStreaming() { + return delegate.supportsStreaming(); + } } /** diff --git a/server/src/main/java/org/opensearch/rest/RestController.java b/server/src/main/java/org/opensearch/rest/RestController.java index 95abb9b3daeca..0c173523fa7cd 100644 --- a/server/src/main/java/org/opensearch/rest/RestController.java +++ b/server/src/main/java/org/opensearch/rest/RestController.java @@ -54,6 +54,7 @@ import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.http.HttpChunk; import org.opensearch.http.HttpServerTransport; import org.opensearch.identity.IdentityService; import org.opensearch.identity.Subject; @@ -71,12 +72,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Mono; + import static org.opensearch.cluster.metadata.IndexNameExpressionResolver.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; @@ -257,6 +262,32 @@ public void registerHandler(final RestHandler restHandler) { ); } + @Override + public Optional dispatchHandler(String uri, String rawPath, RestRequest.Method method, Map params) { + // Loop through all possible handlers, attempting to dispatch the request + final Iterator allHandlers = getAllRestMethodHandlers(params, rawPath); + + while (allHandlers.hasNext()) { + final RestHandler handler; + final RestMethodHandlers handlers = allHandlers.next(); + if (handlers == null) { + handler = null; + } else { + handler = handlers.getHandler(method); + } + if (handler == null) { + final Set validMethodSet = getValidHandlerMethodSet(rawPath); + if (validMethodSet.contains(method) == false) { + return Optional.empty(); + } + } else { + return Optional.of(handler); + } + } + + return Optional.empty(); + } + @Override public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { try { @@ -318,8 +349,25 @@ private void dispatchRequest(RestRequest request, RestChannel channel, RestHandl } else { inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength); } - // iff we could reserve bytes for the request we need to send the response also over this channel - responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); + + if (handler.supportsStreaming()) { + // The handler may support streaming but not the engine, in this case we fail with the bad request + if (channel instanceof StreamingRestChannel) { + responseChannel = new StreamHandlingHttpChannel((StreamingRestChannel) channel, circuitBreakerService, contentLength); + } else { + throw new IllegalStateException( + "The engine does not support HTTP streaming, unable to serve uri [" + + request.getHttpRequest().uri() + + "] and method [" + + request.getHttpRequest().method() + + "]" + ); + } + } else { + // if we could reserve bytes for the request we need to send the response also over this channel + responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); + } + // TODO: Count requests double in the circuit breaker if they need copying? if (handler.allowsUnsafeBuffers() == false) { request.ensureSafeBuffers(); @@ -642,7 +690,101 @@ private void close() { } inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength); } + } + + private static final class StreamHandlingHttpChannel implements StreamingRestChannel { + private final StreamingRestChannel delegate; + private final CircuitBreakerService circuitBreakerService; + private final int contentLength; + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicBoolean subscribed = new AtomicBoolean(); + + StreamHandlingHttpChannel(StreamingRestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength) { + this.delegate = delegate; + this.circuitBreakerService = circuitBreakerService; + this.contentLength = contentLength; + } + + @Override + public XContentBuilder newBuilder() throws IOException { + return delegate.newBuilder(); + } + + @Override + public XContentBuilder newErrorBuilder() throws IOException { + return delegate.newErrorBuilder(); + } + + @Override + public XContentBuilder newBuilder(@Nullable MediaType mediaType, boolean useFiltering) throws IOException { + return delegate.newBuilder(mediaType, useFiltering); + } + + @Override + public XContentBuilder newBuilder(MediaType mediaType, MediaType responseContentType, boolean useFiltering) throws IOException { + return delegate.newBuilder(mediaType, responseContentType, useFiltering); + } + + @Override + public BytesStreamOutput bytesOutput() { + return delegate.bytesOutput(); + } + + @Override + public RestRequest request() { + return delegate.request(); + } + + @Override + public boolean detailedErrorsEnabled() { + return delegate.detailedErrorsEnabled(); + } + + @Override + public void sendResponse(RestResponse response) { + close(); + + // Check if subscribe() is already called, the headers and status are going to be sent + // over so we need to populate those **before** that, if possible. + if (subscribed.get() == false) { + prepareResponse(response.status(), Map.of("Content-Type", List.of(response.contentType()))); + Mono.ignoreElements(this).then(Mono.just(response)).subscribe(delegate::sendResponse); + } + } + + @Override + public void sendChunk(HttpChunk chunk) { + delegate.sendChunk(chunk); + } + + @Override + public void prepareResponse(RestStatus status, Map> headers) { + delegate.prepareResponse(status, headers); + } + + @Override + public void subscribe(Subscriber subscriber) { + subscribed.set(true); + delegate.subscribe(subscriber); + } + + private void close() { + // attempt to close once atomically + if (closed.compareAndSet(false, true) == false) { + throw new IllegalStateException("Channel is already closed"); + } + inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength); + } + @Override + public boolean isReadable() { + return delegate.isReadable(); + } + + @Override + public boolean isWritable() { + return delegate.isWritable(); + } } private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) { diff --git a/server/src/main/java/org/opensearch/rest/RestHandler.java b/server/src/main/java/org/opensearch/rest/RestHandler.java index 877afdd951088..1139e5fc65f31 100644 --- a/server/src/main/java/org/opensearch/rest/RestHandler.java +++ b/server/src/main/java/org/opensearch/rest/RestHandler.java @@ -72,6 +72,14 @@ default boolean supportsContentStream() { return false; } + /** + * Indicates if the RestHandler supports request / response streaming. Please note that the transport engine has to support + * streaming as well. + */ + default boolean supportsStreaming() { + return false; + } + /** * Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return * {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the diff --git a/server/src/main/java/org/opensearch/rest/StreamingRestChannel.java b/server/src/main/java/org/opensearch/rest/StreamingRestChannel.java new file mode 100644 index 0000000000000..9460fa5a226c2 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/StreamingRestChannel.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.http.HttpChunk; + +import java.util.List; +import java.util.Map; + +import org.reactivestreams.Publisher; + +/** + * A streaming channel used to prepare response and sending the response in chunks. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface StreamingRestChannel extends RestChannel, Publisher { + /** + * Sends the next {@link HttpChunk} to the response stream + * @param chunk response chunk + */ + void sendChunk(HttpChunk chunk); + + /** + * Prepares response before kicking of content streaming + * @param status response status + * @param headers response headers + */ + void prepareResponse(RestStatus status, Map> headers); + + /** + * Returns {@code true} is this channel is ready for streaming request data, {@code false} otherwise + * @return {@code true} is this channel is ready for streaming request data, {@code false} otherwise + */ + boolean isReadable(); + + /** + * Returns {@code true} is this channel is ready for streaming response data, {@code false} otherwise + * @return {@code true} is this channel is ready for streaming response data, {@code false} otherwise + */ + boolean isWritable(); +} diff --git a/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java b/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java new file mode 100644 index 0000000000000..ce6e32a7824c9 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java @@ -0,0 +1,199 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.document; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.bulk.BulkShardRequest; +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.client.Requests; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.support.XContentHttpChunk; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.MediaType; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.StreamingRestChannel; +import org.opensearch.search.fetch.subphase.FetchSourceContext; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.POST; +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + *
+ * { "index" : { "_index" : "test", "_id" : "1" }
+ * { "type1" : { "field1" : "value1" } }
+ * { "delete" : { "_index" : "test", "_id" : "2" } }
+ * { "create" : { "_index" : "test", "_id" : "1" }
+ * { "type1" : { "field1" : "value1" } }
+ * 
+ * + * @opensearch.api + */ +public class RestBulkStreamingAction extends BaseRestHandler { + private static final BulkResponse EMPTY = new BulkResponse(new BulkItemResponse[0], 0L); + private final boolean allowExplicitIndex; + + public RestBulkStreamingAction(Settings settings) { + this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); + } + + @Override + public List routes() { + return unmodifiableList( + asList( + new Route(POST, "/_bulk/stream"), + new Route(PUT, "/_bulk/stream"), + new Route(POST, "/{index}/_bulk/stream"), + new Route(PUT, "/{index}/_bulk/stream") + ) + ); + } + + @Override + public String getName() { + return "streaming_bulk_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + final String defaultIndex = request.param("index"); + final String defaultRouting = request.param("routing"); + final String defaultPipeline = request.param("pipeline"); + final String waitForActiveShards = request.param("wait_for_active_shards"); + final Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null); + final TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT); + final String refresh = request.param("refresh"); + + final StreamingRestChannelConsumer consumer = (channel) -> { + final MediaType mediaType = request.getMediaType(); + + // Set the content type and the status code before sending the response stream over + channel.prepareResponse(RestStatus.OK, Map.of("Content-Type", List.of(mediaType.mediaTypeWithoutParameters()))); + + // This is initial implementation at the moment which transforms each single request stream chunk into + // individual bulk request and streams each response back. Another source of inefficiency comes from converting + // bulk response from raw (json/yaml/...) to model and back to raw (json/yaml/...). + + // TODOs: + // - add batching (by interval and/or count) + // - eliminate serialization inefficiencies + Flux.from(channel).map(chunk -> { + FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request); + BulkRequest bulkRequest = Requests.bulkRequest(); + if (waitForActiveShards != null) { + bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); + } + + bulkRequest.timeout(timeout); + bulkRequest.setRefreshPolicy(refresh); + + try { + bulkRequest.add( + chunk.content(), + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + allowExplicitIndex, + request.getMediaType() + ); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + + return Tuple.tuple(chunk.isLast(), bulkRequest); + }).flatMap(tuple -> { + final CompletableFuture f = new CompletableFuture<>(); + + if (tuple.v2().requests().isEmpty()) { + // this is the last request with no items + f.complete(EMPTY); + } else { + client.bulk(tuple.v2(), new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + f.complete(response); + } + + @Override + public void onFailure(Exception ex) { + f.completeExceptionally(ex); + } + }); + + if (tuple.v1() == true /* last chunk */ ) { + return Flux.just(f, CompletableFuture.completedFuture(EMPTY)); + } + } + + return Mono.just(f); + }).concatMap(f -> Mono.fromFuture(f).doOnNext(r -> { + try { + if (r == EMPTY) { + channel.sendChunk(XContentHttpChunk.last()); + } else { + try (XContentBuilder builder = channel.newBuilder(mediaType, true)) { + channel.sendChunk(XContentHttpChunk.from(r.toXContent(builder, ToXContent.EMPTY_PARAMS))); + } + } + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + })).subscribe(); + }; + + return channel -> { + if (channel instanceof StreamingRestChannel) { + consumer.accept((StreamingRestChannel) channel); + } else { + final ActionRequestValidationException validationError = new ActionRequestValidationException(); + validationError.addValidationError("Unable to initiate request / response streaming over non-streaming channel"); + channel.sendResponse(new BytesRestResponse(channel, validationError)); + } + }; + } + + @Override + public boolean supportsContentStream() { + return true; + } + + @Override + public boolean supportsStreaming() { + return true; + } + + @Override + public boolean allowsUnsafeBuffers() { + return true; + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/document/RestBulkStreamingActionTests.java b/server/src/test/java/org/opensearch/rest/action/document/RestBulkStreamingActionTests.java new file mode 100644 index 0000000000000..aa52ffe08927c --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/document/RestBulkStreamingActionTests.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.document; + +import org.opensearch.Version; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.SetOnce; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.client.NoOpNodeClient; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.HashMap; +import java.util.Map; + +import org.mockito.ArgumentCaptor; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link RestBulkStreamingAction}. + */ +public class RestBulkStreamingActionTests extends OpenSearchTestCase { + public void testBulkStreamingPipelineUpsert() throws Exception { + SetOnce bulkCalled = new SetOnce<>(); + try (NodeClient verifyingClient = new NoOpNodeClient(this.getTestName()) { + @Override + public void bulk(BulkRequest request, ActionListener listener) { + bulkCalled.set(true); + } + }) { + final Map params = new HashMap<>(); + params.put("pipeline", "timestamps"); + + final FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk/streaming") + .withParams(params) + .withContent( + new BytesArray( + "{\"index\":{\"_id\":\"1\"}}\n" + + "{\"field1\":\"val1\"}\n" + + "{\"update\":{\"_id\":\"2\"}}\n" + + "{\"script\":{\"source\":\"ctx._source.counter++;\"},\"upsert\":{\"field1\":\"upserted_val\"}}\n" + ), + MediaTypeRegistry.JSON + ) + .withMethod(RestRequest.Method.POST) + .build(); + request.param("error_trace", "false"); + request.param("rest.exception.stacktrace.skip", "false"); + + final RestChannel channel = mock(RestChannel.class); + when(channel.request()).thenReturn(request); + when(channel.newErrorBuilder()).thenReturn(XContentType.YAML.contentBuilder()); + when(channel.detailedErrorsEnabled()).thenReturn(true); + + new RestBulkStreamingAction(settings(Version.CURRENT).build()).handleRequest(request, channel, verifyingClient); + + final ArgumentCaptor responseCaptor = ArgumentCaptor.captor(); + verify(channel).sendResponse(responseCaptor.capture()); + + // We do not expect `bulk` action to be called since the default HTTP transport (netty4) does not support streaming + assertThat(bulkCalled.get(), equalTo(null)); + assertThat(responseCaptor.getValue().status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat( + responseCaptor.getValue().content().utf8ToString(), + containsString("Unable to initiate request / response streaming") + ); + } + } +}