diff --git a/src/main/java/com/amazon/ion/impl/IonCursorBinary.java b/src/main/java/com/amazon/ion/impl/IonCursorBinary.java index 813d8e0100..c32e23bf6a 100644 --- a/src/main/java/com/amazon/ion/impl/IonCursorBinary.java +++ b/src/main/java/com/amazon/ion/impl/IonCursorBinary.java @@ -699,17 +699,22 @@ private boolean slowSeek(long numberOfBytes) { return false; } offset = limit; + long shortfall; long skipped = 0; - try { - skipped = refillableState.inputStream.skip(unbufferedBytesToSkip); - } catch (EOFException e) { - // Certain InputStream implementations (e.g. GZIPInputStream) throw EOFException if more bytes are requested - // to skip than are currently available (e.g. if a header or trailer is incomplete). - } catch (IOException e) { - throwAsIonException(e); - } - refillableState.totalDiscardedBytes += skipped; - long shortfall = numberOfBytes - (skipped + size); + do { + try { + skipped = refillableState.inputStream.skip(unbufferedBytesToSkip); + } catch (EOFException e) { + // Certain InputStream implementations (e.g. GZIPInputStream) throw EOFException if more bytes are requested + // to skip than are currently available (e.g. if a header or trailer is incomplete). + skipped = 0; + } catch (IOException e) { + throwAsIonException(e); + } + refillableState.totalDiscardedBytes += skipped; + shortfall = unbufferedBytesToSkip - skipped; + unbufferedBytesToSkip = shortfall; + } while (shortfall > 0 && skipped > 0); if (shortfall <= 0) { refillableState.bytesRequested = 0; refillableState.state = State.READY; diff --git a/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java b/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java index 3abc60748a..228d26ca84 100644 --- a/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java +++ b/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java @@ -38,7 +38,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.EOFException; -import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.math.BigDecimal; @@ -52,7 +51,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Supplier; import java.util.zip.GZIPInputStream; import static com.amazon.ion.BitUtils.bytes; @@ -3376,6 +3374,11 @@ public int read(byte[] b, int off, int len) throws IOException { @Override public long skip(long len) { int numberOfBytesToSkip = calculateNumberOfBytesToReturn((int) len); + if (numberOfBytesToSkip < 0) { + // The contract for InputStream.skip() does not use -1 to indicate EOF. It always returns the actual + // number of bytes skipped. + return 0; + } offset += numberOfBytesToSkip; return numberOfBytesToSkip; } @@ -3399,10 +3402,14 @@ public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithout reader.close(); } - @Test - public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithoutReachingEofAndTheReaderSkipsTheValue() throws Exception { - reader = boundedReaderFor(new ThrottlingInputStream(bytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 0x20), false), 8, 8, byteAndOversizedValueCountingHandler); + @ParameterizedTest(name = "throwOnEof={0}") + @ValueSource(booleans = {true, false}) + public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithoutReachingEofAndTheIncrementalReaderSkipsTheValue(boolean throwOnEof) throws Exception { + readerBuilder = readerBuilder.withIncrementalReadingEnabled(true); + reader = boundedReaderFor(new ThrottlingInputStream(bytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 0x20), throwOnEof), 8, 8, byteAndOversizedValueCountingHandler); assertSequence( + // The string value is oversized, and is automatically skipped to comply with the incremental reader's + // guarantee that next() results in a fully-buffered top-level value. next(IonType.INT), intValue(0), next(null) ); @@ -3410,6 +3417,44 @@ public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithout assertEquals(1, oversizedCounter.get()); } + @ParameterizedTest(name = "throwOnEof={0}") + @ValueSource(booleans = {true, false}) + public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithoutReachingEofAndTheNonIcrementalReaderSkipsTheValue(boolean throwOnEof) throws Exception { + readerBuilder = readerBuilder.withIncrementalReadingEnabled(false); + reader = boundedReaderFor(new ThrottlingInputStream(bytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 0x20), throwOnEof), 8, 8, byteAndOversizedValueCountingHandler); + assertSequence( + // The string value is oversized, but since the reader is non-incremental, an error is only surfaced if the + // user tries to materialize it. + next(IonType.STRING), + next(IonType.INT), intValue(0), + next(null) + ); + reader.close(); + assertEquals(0, oversizedCounter.get()); + } + + @ParameterizedTest(name = "incrementalReadingEnabled={0},throwOnEof={1}") + @CsvSource({ + "true, true", + "true, false", + "false, true", + "false, false" + }) + public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithoutReachingEofWithAnUnboundedBufferAndTheReaderSkipsTheValue(boolean incrementalReadingEnabled, boolean throwOnEof) throws Exception { + readerBuilder = readerBuilder.withIncrementalReadingEnabled(incrementalReadingEnabled); + // The reader's buffer size is unbounded, but the string value is larger than the initial buffer size. Skipping + // the value therefore causes some of its bytes to be skipped without ever being buffered, by calling + // InputStream.skip(). + reader = boundedReaderFor(new ThrottlingInputStream(bytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 0x20), throwOnEof), 6, Integer.MAX_VALUE, byteAndOversizedValueCountingHandler); + assertSequence( + next(IonType.STRING), // skip the string value, causing unbuffered bytes to be skipped + next(IonType.INT), intValue(0), + next(null) + ); + reader.close(); + assertEquals(0, oversizedCounter.get()); + } + @Test public void shouldNotFailWhenGZIPBoundaryIsEncounteredInStringValue() throws Exception { ResizingPipedInputStream pipe = new ResizingPipedInputStream(128);