diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 2529c50fa10..d716f97e16b 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -191,15 +191,7 @@ private Dictionary getDictionary(final SeekableChannelContext channelContext) { } else { return NULL_DICTIONARY; } - // Use the context object provided by the caller, or create (and close) a new one - try ( - final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); - final SeekableByteChannel ch = - channelsProvider.getReadChannel(holder.get(), getURI()).position(dictionaryPageOffset)) { - return readDictionary(ch, holder.get()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return readDictionary(dictionaryPageOffset, channelContext); } @Override @@ -218,31 +210,37 @@ public SeekableChannelsProvider getChannelsProvider() { } @NotNull - private Dictionary readDictionary(SeekableByteChannel ch, SeekableChannelContext channelContext) - throws IOException { - // explicitly not closing this, caller is responsible - final PageHeader pageHeader = readPageHeader(ch); - if (pageHeader.getType() != PageType.DICTIONARY_PAGE) { - // In case our fallback in getDictionary was too optimistic... - return NULL_DICTIONARY; - } - final DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header(); - final int compressedPageSize = pageHeader.getCompressed_page_size(); - final BytesInput payload; - try (final InputStream in = (compressedPageSize == 0) ? null - : SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch, compressedPageSize)) { - if (compressedPageSize == 0) { - // Sometimes the size is explicitly empty, just use an empty payload - payload = BytesInput.empty(); - } else { - payload = decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(), - channelContext); + private Dictionary readDictionary(long dictionaryPageOffset, SeekableChannelContext channelContext) { + // Use the context object provided by the caller, or create (and close) a new one + try ( + final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); + final SeekableByteChannel ch = + channelsProvider.getReadChannel(holder.get(), getURI()).position(dictionaryPageOffset)) { + final PageHeader pageHeader = readPageHeader(ch); + if (pageHeader.getType() != PageType.DICTIONARY_PAGE) { + // In case our fallback in getDictionary was too optimistic... + return NULL_DICTIONARY; + } + final DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header(); + final int compressedPageSize = pageHeader.getCompressed_page_size(); + final BytesInput payload; + try (final InputStream in = (compressedPageSize == 0) ? null + : channelsProvider.getInputStream(ch, compressedPageSize)) { + if (compressedPageSize == 0) { + // Sometimes the size is explicitly empty, just use an empty payload + payload = BytesInput.empty(); + } else { + payload = decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(), + holder.get()); + } + final Encoding encoding = Encoding.valueOf(dictHeader.getEncoding().name()); + final DictionaryPage dictionaryPage = new DictionaryPage(payload, dictHeader.getNum_values(), encoding); + // We are safe to not copy the payload because the Dictionary doesn't hold a reference to dictionaryPage + // or payload and thus doesn't hold a reference to the input stream. + return encoding.initDictionary(path, dictionaryPage); } - final Encoding encoding = Encoding.valueOf(dictHeader.getEncoding().name()); - final DictionaryPage dictionaryPage = new DictionaryPage(payload, dictHeader.getNum_values(), encoding); - // We are safe to not copy the payload because the Dictionary doesn't hold a reference to dictionaryPage or - // payload and thus doesn't hold a reference to the input stream. - return encoding.initDictionary(path, dictionaryPage); + } catch (IOException e) { + throw new UncheckedIOException(e); } } @@ -318,6 +316,9 @@ private org.apache.parquet.format.Encoding getEncoding(final PageHeader pageHead } } + /** + * Read the page header from the given channel and increment the channel position by the number of bytes read. + */ private PageHeader readPageHeader(final SeekableByteChannel ch) throws IOException { // We expect page headers to be smaller than 128 bytes try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch, 128)) {