Skip to content

Commit

Permalink
Parse parquet footer row groups lazily
Browse files Browse the repository at this point in the history
  • Loading branch information
jinyangli34 authored and raunaqmorarka committed Jan 2, 2025
1 parent 1223ce5 commit ca3d6b2
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package io.trino.parquet.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSourceId;
Expand All @@ -35,7 +37,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -44,52 +45,58 @@
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.parquet.ParquetMetadataConverter.convertEncodingStats;
import static io.trino.parquet.ParquetMetadataConverter.getEncoding;
import static io.trino.parquet.ParquetMetadataConverter.getLogicalTypeAnnotation;
import static io.trino.parquet.ParquetMetadataConverter.getPrimitive;
import static io.trino.parquet.ParquetMetadataConverter.toColumnIndexReference;
import static io.trino.parquet.ParquetMetadataConverter.toOffsetIndexReference;
import static io.trino.parquet.ParquetValidationUtils.validateParquet;
import static java.util.Objects.requireNonNull;

public class ParquetMetadata
{
private static final Logger log = Logger.get(ParquetMetadata.class);

private final FileMetadata fileMetaData;
private final List<BlockMetadata> blocks;
private final FileMetaData parquetMetadata;
private final ParquetDataSourceId dataSourceId;
private final FileMetadata fileMetadata;

public ParquetMetadata(FileMetadata fileMetaData, List<BlockMetadata> blocks)
{
this.fileMetaData = fileMetaData;
this.blocks = blocks;
}

public List<BlockMetadata> getBlocks()
public ParquetMetadata(FileMetaData parquetMetadata, ParquetDataSourceId dataSourceId)
throws ParquetCorruptionException
{
return blocks;
this.fileMetadata = new FileMetadata(
readMessageType(parquetMetadata, dataSourceId),
keyValueMetaData(parquetMetadata),
parquetMetadata.getCreated_by());
this.parquetMetadata = parquetMetadata;
this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null");
}

public FileMetadata getFileMetaData()
{
return fileMetaData;
return fileMetadata;
}

@Override
public String toString()
{
return "ParquetMetaData{" + fileMetaData + ", blocks: " + blocks + "}";
return toStringHelper(this)
.add("parquetMetadata", parquetMetadata)
.toString();
}

public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId)
public List<BlockMetadata> getBlocks()
throws ParquetCorruptionException
{
List<SchemaElement> schema = fileMetaData.getSchema();
List<SchemaElement> schema = parquetMetadata.getSchema();
validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty");

MessageType messageType = readParquetSchema(schema);
List<BlockMetadata> blocks = new ArrayList<>();
List<RowGroup> rowGroups = fileMetaData.getRow_groups();
List<RowGroup> rowGroups = parquetMetadata.getRow_groups();
if (rowGroups != null) {
for (RowGroup rowGroup : rowGroups) {
List<ColumnChunk> columns = rowGroup.getColumns();
Expand All @@ -114,7 +121,7 @@ public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, P
CompressionCodecName.fromParquet(metaData.codec),
convertEncodingStats(metaData.encoding_stats),
readEncodings(metaData.encodings),
MetadataReader.readStats(Optional.ofNullable(fileMetaData.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType),
MetadataReader.readStats(Optional.ofNullable(parquetMetadata.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
Expand All @@ -129,18 +136,13 @@ public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, P
}
}

Map<String, String> keyValueMetaData = new HashMap<>();
List<KeyValue> keyValueList = fileMetaData.getKey_value_metadata();
if (keyValueList != null) {
for (KeyValue keyValue : keyValueList) {
keyValueMetaData.put(keyValue.key, keyValue.value);
}
}
FileMetadata parquetFileMetadata = new FileMetadata(
messageType,
keyValueMetaData,
fileMetaData.getCreated_by());
return new ParquetMetadata(parquetFileMetadata, blocks);
return blocks;
}

@VisibleForTesting
public FileMetaData getParquetMetadata()
{
return parquetMetadata;
}

private static MessageType readParquetSchema(List<SchemaElement> schema)
Expand Down Expand Up @@ -222,4 +224,27 @@ private static Set<Encoding> readEncodings(List<org.apache.parquet.format.Encodi
}
return Collections.unmodifiableSet(columnEncodings);
}

private static MessageType readMessageType(FileMetaData parquetMetadata, ParquetDataSourceId dataSourceId)
throws ParquetCorruptionException
{
List<SchemaElement> schema = parquetMetadata.getSchema();
validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty");

Iterator<SchemaElement> schemaIterator = schema.iterator();
SchemaElement rootSchema = schemaIterator.next();
Types.MessageTypeBuilder builder = Types.buildMessage();
readTypeSchema(builder, schemaIterator, rootSchema.getNum_children());
return builder.named(rootSchema.name);
}

private static Map<String, String> keyValueMetaData(FileMetaData parquetMetadata)
{
if (parquetMetadata.getKey_value_metadata() == null) {
return ImmutableMap.of();
}
return parquetMetadata.getKey_value_metadata()
.stream()
.collect(toImmutableMap(KeyValue::getKey, KeyValue::getValue, (_, second) -> second));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional<
InputStream metadataStream = buffer.slice(buffer.length() - completeFooterSize, metadataLength).getInput();

FileMetaData fileMetaData = readFileMetaData(metadataStream);
ParquetMetadata parquetMetadata = ParquetMetadata.createParquetMetadata(fileMetaData, dataSource.getId());
ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, dataSource.getId());
validateFileMetadata(dataSource.getId(), parquetMetadata.getFileMetaData(), parquetWriteValidation);
return parquetMetadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ private void flush()
columnMetaDataBuilder.add(columnMetaData);
currentOffset += columnMetaData.getTotal_compressed_size();
}
updateRowGroups(columnMetaDataBuilder.build());
updateRowGroups(columnMetaDataBuilder.build(), outputStream.longSize());

// flush pages
for (BufferData bufferData : bufferDataList) {
Expand Down Expand Up @@ -409,12 +409,14 @@ private void writeBloomFilters(List<RowGroup> rowGroups, List<List<Optional<Bloo
}
}

private void updateRowGroups(List<ColumnMetaData> columnMetaData)
private void updateRowGroups(List<ColumnMetaData> columnMetaData, long fileOffset)
{
long totalCompressedBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_compressed_size).sum();
long totalBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_uncompressed_size).sum();
ImmutableList<org.apache.parquet.format.ColumnChunk> columnChunks = columnMetaData.stream().map(ParquetWriter::toColumnChunk).collect(toImmutableList());
fileFooter.addRowGroup(new RowGroup(columnChunks, totalBytes, rows).setTotal_compressed_size(totalCompressedBytes));
fileFooter.addRowGroup(new RowGroup(columnChunks, totalBytes, rows)
.setTotal_compressed_size(totalCompressedBytes)
.setFile_offset(fileOffset));
}

private static Slice serializeFooter(FileMetaData fileMetaData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.RowGroup;
import org.apache.parquet.format.Util;
import org.apache.parquet.schema.PrimitiveType;
import org.assertj.core.data.Percentage;
Expand Down Expand Up @@ -379,6 +380,38 @@ public void testDictionaryPageOffset()
}
}

@Test
void testRowGroupOffset()
throws IOException
{
// Write a file with 100 rows per row-group
List<String> columnNames = ImmutableList.of("columnA", "columnB");
List<Type> types = ImmutableList.of(INTEGER, BIGINT);

ParquetDataSource dataSource = new TestingParquetDataSource(
writeParquetFile(
ParquetWriterOptions.builder()
.setMaxBlockSize(DataSize.ofBytes(1000))
.build(),
types,
columnNames,
generateInputPages(types, 100, 10)),
new ParquetReaderOptions());

ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
List<BlockMetadata> blocks = parquetMetadata.getBlocks();
assertThat(blocks.size()).isGreaterThan(1);

List<RowGroup> rowGroups = parquetMetadata.getParquetMetadata().getRow_groups();
assertThat(rowGroups.size()).isEqualTo(blocks.size());
for (int rowGroupIndex = 0; rowGroupIndex < rowGroups.size(); rowGroupIndex++) {
RowGroup rowGroup = rowGroups.get(rowGroupIndex);
assertThat(rowGroup.isSetFile_offset()).isTrue();
BlockMetadata blockMetadata = blocks.get(rowGroupIndex);
assertThat(blockMetadata.getStartingPos()).isEqualTo(rowGroup.getFile_offset());
}
}

@ParameterizedTest
@MethodSource("testWriteBloomFiltersParams")
public void testWriteBloomFilters(Type type, List<?> data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public DataFileInfo getDataFileInfo()
{
Location path = rootTableLocation.appendPath(relativeFilePath);
FileMetaData fileMetaData = fileWriter.getFileMetadata();
ParquetMetadata parquetMetadata = ParquetMetadata.createParquetMetadata(fileMetaData, new ParquetDataSourceId(path.toString()));
ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, new ParquetDataSourceId(path.toString()));

return new DataFileInfo(
relativeFilePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ public FileMetrics getFileMetrics()
{
ParquetMetadata parquetMetadata;
try {
parquetMetadata = ParquetMetadata.createParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString()));
parquetMetadata = new ParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString()));
return new FileMetrics(footerMetrics(parquetMetadata, Stream.empty(), metricsConfig), Optional.of(getSplitOffsets(parquetMetadata)));
}
catch (IOException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Error creating metadata for Parquet file %s", location), e);
}
return new FileMetrics(footerMetrics(parquetMetadata, Stream.empty(), metricsConfig), Optional.of(getSplitOffsets(parquetMetadata)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.trino.plugin.iceberg.util;

import com.google.common.collect.ImmutableList;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.metadata.BlockMetadata;
import io.trino.parquet.metadata.ColumnChunkMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
Expand Down Expand Up @@ -69,6 +70,7 @@ public final class ParquetUtil
private ParquetUtil() {}

public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetrics<?>> fieldMetrics, MetricsConfig metricsConfig)
throws ParquetCorruptionException
{
return footerMetrics(metadata, fieldMetrics, metricsConfig, null);
}
Expand All @@ -78,6 +80,7 @@ public static Metrics footerMetrics(
Stream<FieldMetrics<?>> fieldMetrics,
MetricsConfig metricsConfig,
NameMapping nameMapping)
throws ParquetCorruptionException
{
requireNonNull(fieldMetrics, "fieldMetrics should not be null");

Expand Down Expand Up @@ -156,9 +159,11 @@ public static Metrics footerMetrics(
}

public static List<Long> getSplitOffsets(ParquetMetadata metadata)
throws ParquetCorruptionException
{
List<Long> splitOffsets = new ArrayList<>(metadata.getBlocks().size());
for (BlockMetadata blockMetaData : metadata.getBlocks()) {
List<BlockMetadata> blocks = metadata.getBlocks();
List<Long> splitOffsets = new ArrayList<>(blocks.size());
for (BlockMetadata blockMetaData : blocks) {
splitOffsets.add(blockMetaData.getStartingPos());
}
Collections.sort(splitOffsets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,20 @@ private static boolean checkOrcFileSorting(Supplier<OrcDataSource> dataSourceSup
public static boolean checkParquetFileSorting(TrinoInputFile inputFile, String sortColumnName)
{
ParquetMetadata parquetMetadata;
List<BlockMetadata> blocks;
try {
parquetMetadata = MetadataReader.readFooter(
new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()),
Optional.empty());
blocks = parquetMetadata.getBlocks();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}

Comparable previousMax = null;
verify(parquetMetadata.getBlocks().size() > 1, "Test must produce at least two row groups");
for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) {
verify(blocks.size() > 1, "Test must produce at least two row groups");
for (BlockMetadata blockMetaData : blocks) {
ColumnChunkMetadata columnMetadata = blockMetaData.columns().stream()
.filter(column -> getOnlyElement(column.getPath().iterator()).equalsIgnoreCase(sortColumnName))
.collect(onlyElement());
Expand Down

0 comments on commit ca3d6b2

Please sign in to comment.