diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharDataConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharDataConsumer.java index 48a2498c4..74a2c4c3a 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharDataConsumer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharDataConsumer.java @@ -57,6 +57,7 @@ public abstract class AbstractCharDataConsumer implements AsyncDataConsumer { private volatile Charset charset; private volatile CharsetDecoder charsetDecoder; + private volatile ByteBuffer byteBuffer; protected AbstractCharDataConsumer(final int bufSize, final CharCodingConfig charCodingConfig) { this.charBuffer = CharBuffer.allocate(Args.positive(bufSize, "Buffer size")); @@ -133,8 +134,35 @@ private CharsetDecoder getCharsetDecoder() { public final void consume(final ByteBuffer src) throws IOException { final CharsetDecoder charsetDecoder = getCharsetDecoder(); while (src.hasRemaining()) { - checkResult(charsetDecoder.decode(src, charBuffer, false)); - doDecode(false); + if (byteBuffer != null && byteBuffer.position() > 0) { + // There are some left-overs from the previous input operation + final int n = byteBuffer.remaining(); + if (n < src.remaining()) { + final int oldLimit = src.limit(); + src.limit(src.position() + n); + byteBuffer.put(src); + src.limit(oldLimit); + } else { + byteBuffer.put(src); + } + byteBuffer.flip(); + final CoderResult r = charsetDecoder.decode(byteBuffer, charBuffer, false); + checkResult(r); + doDecode(false); + byteBuffer.compact(); + } + if (byteBuffer == null || byteBuffer.position() == 0) { + final CoderResult r = charsetDecoder.decode(src, charBuffer, false); + checkResult(r); + doDecode(false); + if (r.isUnderflow() && src.hasRemaining()) { + // in case of input underflow src can be expected to be very small (one incomplete UTF8 char) + if (byteBuffer == null) { + byteBuffer = ByteBuffer.allocate(Math.max(src.remaining(), 1024)); + } + byteBuffer.put(src); + } + } } } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityConsumer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityConsumer.java index 6a4ec74bc..d5eb25e89 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityConsumer.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityConsumer.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicLong; import org.apache.hc.core5.concurrent.FutureCallback; @@ -37,6 +38,7 @@ import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.impl.BasicEntityDetails; import org.apache.hc.core5.http.nio.AsyncEntityConsumer; +import org.apache.hc.core5.util.ByteArrayBuffer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -113,4 +115,54 @@ public void cancelled() { Assertions.assertEquals(1L, count.longValue()); } + @Test + public void testConsumeIncompleteData() throws Exception { + + final AsyncEntityConsumer consumer = new StringBuilderAsyncEntityConsumer(); + + final AtomicLong count = new AtomicLong(0); + consumer.streamStart(new BasicEntityDetails(-1, ContentType.TEXT_PLAIN.withCharset(StandardCharsets.UTF_8)), new FutureCallback() { + + @Override + public void completed(final String result) { + count.incrementAndGet(); + } + + @Override + public void failed(final Exception ex) { + count.incrementAndGet(); + } + + @Override + public void cancelled() { + count.incrementAndGet(); + } + + }); + + final byte[] stuff = "stuff".getBytes(StandardCharsets.UTF_8); + final byte[] splitCharacter = "£".getBytes(StandardCharsets.UTF_8); + + final ByteArrayBuffer b1 = new ByteArrayBuffer(1024); + b1.append(stuff, 0, stuff.length); + b1.append(splitCharacter, 0, 1); + consumer.consume(ByteBuffer.wrap(b1.toByteArray())); + + final ByteArrayBuffer b2 = new ByteArrayBuffer(1024); + b2.append(splitCharacter, 1, 1); + b2.append(stuff, 0, stuff.length); + b2.append(splitCharacter, 0, 1); + consumer.consume(ByteBuffer.wrap(b2.toByteArray())); + + final ByteArrayBuffer b3 = new ByteArrayBuffer(1024); + b3.append(splitCharacter, 1, 1); + b3.append(stuff, 0, stuff.length); + consumer.consume(ByteBuffer.wrap(b3.toByteArray())); + + consumer.streamEnd(null); + + Assertions.assertEquals("stuff£stuff£stuff", consumer.getContent()); + Assertions.assertEquals(1L, count.longValue()); + } + }