Skip to content

Commit

Permalink
Minor changes to code layout for readability
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Jun 14, 2024
1 parent 61a1872 commit 056d706
Showing 1 changed file with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,7 @@ private Dictionary getDictionary(final SeekableChannelContext channelContext) {
} else {
return NULL_DICTIONARY;
}
// Use the context object provided by the caller, or create (and close) a new one
try (
final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
final SeekableByteChannel ch =
channelsProvider.getReadChannel(holder.get(), getURI()).position(dictionaryPageOffset)) {
return readDictionary(ch, holder.get());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return readDictionary(dictionaryPageOffset, channelContext);
}

@Override
Expand All @@ -218,31 +210,37 @@ public SeekableChannelsProvider getChannelsProvider() {
}

@NotNull
private Dictionary readDictionary(SeekableByteChannel ch, SeekableChannelContext channelContext)
throws IOException {
// explicitly not closing this, caller is responsible
final PageHeader pageHeader = readPageHeader(ch);
if (pageHeader.getType() != PageType.DICTIONARY_PAGE) {
// In case our fallback in getDictionary was too optimistic...
return NULL_DICTIONARY;
}
final DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();
final int compressedPageSize = pageHeader.getCompressed_page_size();
final BytesInput payload;
try (final InputStream in = (compressedPageSize == 0) ? null
: SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch, compressedPageSize)) {
if (compressedPageSize == 0) {
// Sometimes the size is explicitly empty, just use an empty payload
payload = BytesInput.empty();
} else {
payload = decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(),
channelContext);
private Dictionary readDictionary(long dictionaryPageOffset, SeekableChannelContext channelContext) {
// Use the context object provided by the caller, or create (and close) a new one
try (
final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
final SeekableByteChannel ch =
channelsProvider.getReadChannel(holder.get(), getURI()).position(dictionaryPageOffset)) {
final PageHeader pageHeader = readPageHeader(ch);
if (pageHeader.getType() != PageType.DICTIONARY_PAGE) {
// In case our fallback in getDictionary was too optimistic...
return NULL_DICTIONARY;
}
final DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();
final int compressedPageSize = pageHeader.getCompressed_page_size();
final BytesInput payload;
try (final InputStream in = (compressedPageSize == 0) ? null
: channelsProvider.getInputStream(ch, compressedPageSize)) {
if (compressedPageSize == 0) {
// Sometimes the size is explicitly empty, just use an empty payload
payload = BytesInput.empty();
} else {
payload = decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(),
holder.get());
}
final Encoding encoding = Encoding.valueOf(dictHeader.getEncoding().name());
final DictionaryPage dictionaryPage = new DictionaryPage(payload, dictHeader.getNum_values(), encoding);
// We are safe to not copy the payload because the Dictionary doesn't hold a reference to dictionaryPage
// or payload and thus doesn't hold a reference to the input stream.
return encoding.initDictionary(path, dictionaryPage);
}
final Encoding encoding = Encoding.valueOf(dictHeader.getEncoding().name());
final DictionaryPage dictionaryPage = new DictionaryPage(payload, dictHeader.getNum_values(), encoding);
// We are safe to not copy the payload because the Dictionary doesn't hold a reference to dictionaryPage or
// payload and thus doesn't hold a reference to the input stream.
return encoding.initDictionary(path, dictionaryPage);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

Expand Down Expand Up @@ -318,6 +316,9 @@ private org.apache.parquet.format.Encoding getEncoding(final PageHeader pageHead
}
}

/**
* Read the page header from the given channel and increment the channel position by the number of bytes read.
*/
private PageHeader readPageHeader(final SeekableByteChannel ch) throws IOException {
// We expect page headers to be smaller than 128 bytes
try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch, 128)) {
Expand Down

0 comments on commit 056d706

Please sign in to comment.