-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Streaming Indexing] Enhance RestAction with request / response strea…
…ming support Signed-off-by: Andriy Redko <[email protected]>
- Loading branch information
Showing
20 changed files
with
855 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
27 changes: 27 additions & 0 deletions
27
...rt-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/HttpContentSender.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.http.reactor.netty4; | ||
|
||
import org.opensearch.core.action.ActionListener; | ||
|
||
import io.netty.handler.codec.http.HttpContent; | ||
import org.reactivestreams.Publisher; | ||
|
||
/** | ||
* The generic interface for chunked {@link HttpContent} producers (response streaming). | ||
*/ | ||
interface HttpContentSender extends Publisher<HttpContent> { | ||
/** | ||
* Sends the next {@link HttpContent} over the wire | ||
* @param content next {@link HttpContent} | ||
* @param listener action listener | ||
* @param isLast {@code true} if this is the last chunk, {@code false} otherwise | ||
*/ | ||
void send(HttpContent content, ActionListener<Void> listener, boolean isLast); | ||
} |
53 changes: 53 additions & 0 deletions
53
...actor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.http.reactor.netty4; | ||
|
||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.http.HttpChunk; | ||
import org.opensearch.transport.reactor.netty4.Netty4Utils; | ||
|
||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import io.netty.buffer.ByteBuf; | ||
|
||
class ReactorNetty4HttpChunk implements HttpChunk { | ||
private final AtomicBoolean released; | ||
private final boolean pooled; | ||
private final ByteBuf content; | ||
private final boolean last; | ||
|
||
ReactorNetty4HttpChunk(ByteBuf content, boolean last) { | ||
this(new AtomicBoolean(false), true, content, last); | ||
} | ||
|
||
private ReactorNetty4HttpChunk(AtomicBoolean released, boolean pooled, ByteBuf content, boolean last) { | ||
this.content = content; | ||
this.pooled = pooled; | ||
this.released = released; | ||
this.last = last; | ||
} | ||
|
||
@Override | ||
public BytesReference content() { | ||
assert released.get() == false; | ||
return Netty4Utils.toBytesReference(content); | ||
} | ||
|
||
@Override | ||
public void release() { | ||
if (pooled && released.compareAndSet(false, true)) { | ||
content.release(); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean isLast() { | ||
return last; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
118 changes: 118 additions & 0 deletions
118
...4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.http.reactor.netty4; | ||
|
||
import org.opensearch.common.concurrent.CompletableContext; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.http.HttpChunk; | ||
import org.opensearch.http.HttpResponse; | ||
import org.opensearch.http.StreamingHttpChannel; | ||
import org.opensearch.transport.reactor.netty4.Netty4Utils; | ||
|
||
import java.net.InetSocketAddress; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import io.netty.buffer.Unpooled; | ||
import io.netty.handler.codec.http.DefaultHttpContent; | ||
import io.netty.handler.codec.http.FullHttpResponse; | ||
import io.netty.handler.codec.http.HttpContent; | ||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.FluxSink; | ||
import reactor.netty.http.server.HttpServerRequest; | ||
import reactor.netty.http.server.HttpServerResponse; | ||
|
||
class ReactorNetty4StreamingHttpChannel implements StreamingHttpChannel { | ||
private final HttpServerRequest request; | ||
private final HttpServerResponse response; | ||
private final CompletableContext<Void> closeContext = new CompletableContext<>(); | ||
private final Publisher<HttpChunk> receiver; | ||
private final HttpContentSender sender; | ||
private volatile FluxSink<HttpChunk> producer; | ||
private volatile boolean lastChunkReceived = false; | ||
|
||
ReactorNetty4StreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, HttpContentSender sender) { | ||
this.request = request; | ||
this.response = response; | ||
this.sender = sender; | ||
this.receiver = Flux.create(producer -> this.producer = producer); | ||
this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext)); | ||
} | ||
|
||
@Override | ||
public boolean isOpen() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
request.withConnection(connection -> connection.channel().close()); | ||
} | ||
|
||
@Override | ||
public void addCloseListener(ActionListener<Void> listener) { | ||
closeContext.addListener(ActionListener.toBiConsumer(listener)); | ||
} | ||
|
||
@Override | ||
public void sendChunk(HttpChunk chunk, ActionListener<Void> listener) { | ||
sender.send(createContent(chunk), listener, chunk.isLast()); | ||
} | ||
|
||
@Override | ||
public void sendResponse(HttpResponse response, ActionListener<Void> listener) { | ||
sender.send(createContent(response), listener, true); | ||
} | ||
|
||
@Override | ||
public void prepareResponse(int status, Map<String, List<String>> headers) { | ||
this.response.status(status); | ||
headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v))); | ||
} | ||
|
||
@Override | ||
public InetSocketAddress getRemoteAddress() { | ||
return (InetSocketAddress) response.remoteAddress(); | ||
} | ||
|
||
@Override | ||
public InetSocketAddress getLocalAddress() { | ||
return (InetSocketAddress) response.hostAddress(); | ||
} | ||
|
||
@Override | ||
public void receiveChunk(HttpChunk message) { | ||
if (lastChunkReceived) { | ||
return; | ||
} | ||
|
||
producer.next(message); | ||
if (message.isLast()) { | ||
lastChunkReceived = true; | ||
producer.complete(); | ||
} | ||
} | ||
|
||
@Override | ||
public void subscribe(Subscriber<? super HttpChunk> subscriber) { | ||
receiver.subscribe(subscriber); | ||
} | ||
|
||
private static HttpContent createContent(HttpResponse response) { | ||
final FullHttpResponse fullHttpResponse = (FullHttpResponse) response; | ||
return new DefaultHttpContent(fullHttpResponse.content()); | ||
} | ||
|
||
private static HttpContent createContent(HttpChunk chunk) { | ||
return new DefaultHttpContent(Unpooled.copiedBuffer(BytesReference.toBytes(chunk.content()))); | ||
} | ||
} |
Oops, something went wrong.