From 61d76e076c1ea4d476fd2362e83272f961e39665 Mon Sep 17 00:00:00 2001 From: Tyler Gregg Date: Thu, 2 Nov 2023 17:23:24 -0700 Subject: [PATCH] Handles the case where the binary cursor's InputStream provides fewer bytes than requested before reaching EOF. (#623) --- src/com/amazon/ion/impl/IonCursorBinary.java | 45 +++++--- ...onReaderContinuableTopLevelBinaryTest.java | 108 ++++++++++++++++++ 2 files changed, 135 insertions(+), 18 deletions(-) diff --git a/src/com/amazon/ion/impl/IonCursorBinary.java b/src/com/amazon/ion/impl/IonCursorBinary.java index ed1d9defe7..f657c634fe 100644 --- a/src/com/amazon/ion/impl/IonCursorBinary.java +++ b/src/com/amazon/ion/impl/IonCursorBinary.java @@ -524,8 +524,7 @@ private boolean fillAt(long index, long numberOfBytes) { refillableState.bytesRequested = numberOfBytes + (index - offset); if (ensureCapacity(refillableState.bytesRequested)) { // Fill all the free space, not just the shortfall; this reduces I/O. - refill(freeSpaceAt(limit)); - shortfall = refillableState.bytesRequested - availableAt(offset); + shortfall = refill(refillableState.bytesRequested); } else { // The request cannot be satisfied, but not because data was unavailable. Return normally; it is the // caller's responsibility to recover. @@ -646,24 +645,34 @@ private void shiftIndicesLeft(int shiftAmount) { } /** - * Fills the buffer with up to the requested number of additional bytes. It is the caller's responsibility to - * ensure that there is space in the buffer. - * @param numberOfBytesToFill the number of additional bytes to attempt to add to the buffer. + * Attempts to fill the buffer with up to the requested number of additional bytes. It is the caller's + * responsibility to ensure that there is space in the buffer. + * @param minimumNumberOfBytesRequired the minimum number of bytes requested to fill the current value. + * @return the shortfall between the number of bytes that were filled and the minimum number requested. If less than + * 1, then at least `minimumNumberOfBytesRequired` were filled. */ - private void refill(long numberOfBytesToFill) { + private long refill(long minimumNumberOfBytesRequired) { int numberOfBytesFilled = -1; - try { - numberOfBytesFilled = refillableState.inputStream.read(buffer, (int) limit, (int) numberOfBytesToFill); - } catch (EOFException e) { - // Certain InputStream implementations (e.g. GZIPInputStream) throw EOFException if more bytes are requested - // to read than are currently available (e.g. if a header or trailer is incomplete). - } catch (IOException e) { - throwAsIonException(e); - } - if (numberOfBytesFilled < 0) { - return; - } - limit += numberOfBytesFilled; + long shortfall; + // Sometimes an InputStream implementation will return fewer than the number of bytes requested even + // if the stream is not at EOF. If this happens and there is still a shortfall, keep requesting bytes + // until either the shortfall is filled or EOF is reached. + do { + try { + numberOfBytesFilled = refillableState.inputStream.read(buffer, (int) limit, (int) freeSpaceAt(limit)); + } catch (EOFException e) { + // Certain InputStream implementations (e.g. GZIPInputStream) throw EOFException if more bytes are requested + // to read than are currently available (e.g. if a header or trailer is incomplete). + numberOfBytesFilled = -1; + } catch (IOException e) { + throwAsIonException(e); + } + if (numberOfBytesFilled > 0) { + limit += numberOfBytesFilled; + } + shortfall = minimumNumberOfBytesRequired - availableAt(offset); + } while (shortfall > 0 && numberOfBytesFilled >= 0); + return shortfall; } /** diff --git a/test/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java b/test/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java index ff41c7a785..679286d7bf 100644 --- a/test/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java +++ b/test/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java @@ -34,10 +34,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.math.BigDecimal; @@ -3196,6 +3198,112 @@ public void skipWithoutEnoughDataNonIncrementalFails() throws Exception { reader.close(); } + /** + * An InputStream that returns less than the number of bytes requested from bulk reads. + */ + private static class ThrottlingInputStream extends InputStream { + + private final byte[] data; + private final boolean throwFromReadOnEof; + private int offset = 0; + + /** + * @param data the data for the InputStream to provide. + * @param throwFromReadOnEof true if the stream should throw {@link java.io.EOFException} when read() is called + * at EOF. If false, simply returns -1. + */ + protected ThrottlingInputStream(byte[] data, boolean throwFromReadOnEof) { + this.data = data; + this.throwFromReadOnEof = throwFromReadOnEof; + } + + @Override + public int read() { + return data[offset++] & 0xFF; + } + + private int calculateNumberOfBytesToReturn(int numberOfBytesRequested) { + int available = data.length - offset; + int numberOfBytesToReturn; + if (available > 1 && numberOfBytesRequested > 1) { + // Return fewer bytes than requested and fewer than are available, avoiding EOF. + numberOfBytesToReturn = Math.min(available - 1, numberOfBytesRequested - 1); + } else if (available <= 0) { + return -1; // EOF + } else { + // Only 1 byte is available, so return it as long as at least 1 byte was requested. + numberOfBytesToReturn = Math.min(numberOfBytesRequested, available); + } + return numberOfBytesToReturn; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (off + len > b.length) { + throw new IndexOutOfBoundsException(); + } + int numberOfBytesToReturn = calculateNumberOfBytesToReturn(len); + if (numberOfBytesToReturn < 0) { + if (throwFromReadOnEof) { + throw new EOFException(); + } + return -1; + } + System.arraycopy(data, offset, b, off, numberOfBytesToReturn); + offset += numberOfBytesToReturn; + return numberOfBytesToReturn; + } + + @Override + public long skip(long len) { + int numberOfBytesToSkip = calculateNumberOfBytesToReturn((int) len); + offset += numberOfBytesToSkip; + return numberOfBytesToSkip; + } + } + + @ParameterizedTest(name = "incrementalReadingEnabled={0},throwOnEof={1}") + @CsvSource({ + "true, true", + "true, false", + "false, true", + "false, false" + }) + public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithoutReachingEof(boolean incrementalReadingEnabled, boolean throwOnEof) throws Exception { + readerBuilder = readerBuilder.withIncrementalReadingEnabled(incrementalReadingEnabled) + .withBufferConfiguration(IonBufferConfiguration.Builder.standard().withInitialBufferSize(8).build()); + reader = readerFor(new ThrottlingInputStream(bytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'), throwOnEof)); + assertSequence( + next(IonType.STRING), stringValue("abcdefghi"), + next(null) + ); + reader.close(); + } + + @Test + public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithoutReachingEofAndTheReaderSkipsTheValue() throws Exception { + reader = boundedReaderFor(new ThrottlingInputStream(bytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 0x20), false), 8, 8, byteAndOversizedValueCountingHandler); + assertSequence( + next(IonType.INT), intValue(0), + next(null) + ); + reader.close(); + assertEquals(1, oversizedCounter.get()); + } + + @Test + public void shouldNotFailWhenGZIPBoundaryIsEncounteredInStringValue() throws Exception { + ResizingPipedInputStream pipe = new ResizingPipedInputStream(128); + // The following lines create a GZIP payload boundary (trailer/header) in the middle of an Ion string value. + pipe.receive(gzippedBytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b')); + pipe.receive(gzippedBytes('c', 'd', 'e', 'f', 'g', 'h', 'i')); + reader = readerFor(new GZIPInputStream(pipe)); + assertSequence( + next(IonType.STRING), stringValue("abcdefghi"), + next(null) + ); + } + @Test public void concatenatedAfterGZIPHeader() throws Exception { // Tests that a stream that initially contains only a GZIP header can be read successfully if more data