Skip to content

Commit

Permalink
fix a broken test
Browse files Browse the repository at this point in the history
  • Loading branch information
ikhoon committed Aug 5, 2024
1 parent b68917c commit 86ea2cc
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.internal.common.stream.StreamMessageUtil;
import com.linecorp.armeria.unsafe.PooledObjects;

import io.netty.util.concurrent.EventExecutor;
Expand Down Expand Up @@ -219,7 +218,6 @@ public void onNext(T o) {
try {
filtered = filter(o);
} catch (Throwable ex) {
StreamMessageUtil.closeOrAbort(o);
// onError(ex) should be called before upstream.cancel() to deliver the cause to downstream.
// upstream.cancel() and make downstream closed with CancelledSubscriptionException
// before sending the actual cause.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected HttpObject filter(HttpObject obj) {
encodedBuf.readerIndex(encodedBuf.writerIndex());
return httpData;
} catch (IOException e) {
// An unreleased ByteBuf will be released by `beforeError()`
// An unreleased ByteBuf in `encodedStream` will be released by `beforeError()`
throw new IllegalStateException(
"Error encoding HttpData, this should not happen with byte arrays.",
e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletionException;
Expand All @@ -30,6 +32,7 @@
import org.reactivestreams.Subscription;

import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.AggregationOptions;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpObject;
import com.linecorp.armeria.common.HttpResponse;
Expand All @@ -43,17 +46,19 @@
import com.linecorp.armeria.common.stream.NoopSubscriber;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.internal.common.encoding.StreamEncoderFactories;
import com.linecorp.armeria.internal.common.encoding.StreamEncoderFactory;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.ImmediateEventExecutor;
import reactor.test.StepVerifier;

class HttpEncodedResponseTest {

@Test
void testLeak() {
void testLeakOnSubscribe() {
final ByteBuf buf = Unpooled.directBuffer();
buf.writeCharSequence("foo", StandardCharsets.UTF_8);

Expand All @@ -71,6 +76,43 @@ void testLeak() {
assertThat(buf.refCnt()).isZero();
}

@Test
void testLeakOnError() {
final ByteBuf buf = Unpooled.directBuffer();
buf.writeCharSequence("foo", StandardCharsets.UTF_8);

final HttpResponse orig = HttpResponse.of(HttpStatus.OK,
MediaType.PLAIN_TEXT_UTF_8,
HttpData.wrap(buf).withEndOfStream());
final StreamEncoderFactory throwingEncoderFactory = new StreamEncoderFactory() {
@Override
public String encodingHeaderValue() {
return "gzip";
}

@Override
public OutputStream newEncoder(ByteBufOutputStream os) {
return new OutputStream() {
@Override
public void write(int b) throws IOException {
throw new IOException("Oops");
}
};
}
};
final HttpEncodedResponse encoded = new HttpEncodedResponse(
orig, throwingEncoderFactory, mediaType -> true, ByteBufAllocator.DEFAULT, 1);

assertThatThrownBy(() -> {
encoded.aggregate(AggregationOptions.usePooledObjects(ByteBufAllocator.DEFAULT)).join();
}).isInstanceOf(CompletionException.class)
.hasCauseInstanceOf(IllegalStateException.class)
.hasRootCauseInstanceOf(IOException.class);

// 'buf' should be released.
assertThat(buf.refCnt()).isZero();
}

@Test
void shouldReleaseEncodedStreamOnCancel() {
final HttpResponse orig =
Expand Down

0 comments on commit 86ea2cc

Please sign in to comment.