Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize parsing of parquet footers #24618

Merged
merged 4 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,252 @@
*/
package io.trino.parquet.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.reader.MetadataReader;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.format.ColumnChunk;
import org.apache.parquet.format.ColumnMetaData;
import org.apache.parquet.format.FileMetaData;
import org.apache.parquet.format.KeyValue;
import org.apache.parquet.format.RowGroup;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.parquet.ParquetMetadataConverter.convertEncodingStats;
import static io.trino.parquet.ParquetMetadataConverter.getEncoding;
import static io.trino.parquet.ParquetMetadataConverter.getLogicalTypeAnnotation;
import static io.trino.parquet.ParquetMetadataConverter.getPrimitive;
import static io.trino.parquet.ParquetMetadataConverter.toColumnIndexReference;
import static io.trino.parquet.ParquetMetadataConverter.toOffsetIndexReference;
import static io.trino.parquet.ParquetValidationUtils.validateParquet;
import static java.util.Objects.requireNonNull;

public class ParquetMetadata
{
private final FileMetadata fileMetaData;
private final List<BlockMetadata> blocks;
private static final Logger log = Logger.get(ParquetMetadata.class);

private final FileMetaData parquetMetadata;
private final ParquetDataSourceId dataSourceId;
private final FileMetadata fileMetadata;

public ParquetMetadata(FileMetaData parquetMetadata, ParquetDataSourceId dataSourceId)
throws ParquetCorruptionException
{
this.fileMetadata = new FileMetadata(
readMessageType(parquetMetadata, dataSourceId),
keyValueMetaData(parquetMetadata),
parquetMetadata.getCreated_by());
this.parquetMetadata = parquetMetadata;
this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null");
}

public FileMetadata getFileMetaData()
{
return fileMetadata;
}

public ParquetMetadata(FileMetadata fileMetaData, List<BlockMetadata> blocks)
@Override
public String toString()
{
this.fileMetaData = fileMetaData;
this.blocks = blocks;
return toStringHelper(this)
.add("parquetMetadata", parquetMetadata)
.toString();
}

public List<BlockMetadata> getBlocks()
throws ParquetCorruptionException
{
return getBlocks(0, Long.MAX_VALUE);
}

public List<BlockMetadata> getBlocks(long splitStart, long splitLength)
throws ParquetCorruptionException
{
List<SchemaElement> schema = parquetMetadata.getSchema();
validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty");

MessageType messageType = readParquetSchema(schema);
List<BlockMetadata> blocks = new ArrayList<>();
List<RowGroup> rowGroups = parquetMetadata.getRow_groups();
if (rowGroups != null) {
for (RowGroup rowGroup : rowGroups) {
if (rowGroup.isSetFile_offset()) {
long rowGroupStart = rowGroup.getFile_offset();
boolean splitContainsRowGroup = splitStart <= rowGroupStart && rowGroupStart < splitStart + splitLength;
if (!splitContainsRowGroup) {
continue;
}
}

List<ColumnChunk> columns = rowGroup.getColumns();
validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup);
String filePath = columns.get(0).getFile_path();
ImmutableList.Builder<ColumnChunkMetadata> columnMetadataBuilder = ImmutableList.builderWithExpectedSize(columns.size());
for (ColumnChunk columnChunk : columns) {
validateParquet(
(filePath == null && columnChunk.getFile_path() == null)
|| (filePath != null && filePath.equals(columnChunk.getFile_path())),
dataSourceId,
"all column chunks of the same row group must be in the same file");
ColumnMetaData metaData = columnChunk.meta_data;
String[] path = metaData.path_in_schema.stream()
.map(value -> value.toLowerCase(Locale.ENGLISH))
.toArray(String[]::new);
ColumnPath columnPath = ColumnPath.get(path);
PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType();
ColumnChunkMetadata column = ColumnChunkMetadata.get(
columnPath,
primitiveType,
CompressionCodecName.fromParquet(metaData.codec),
convertEncodingStats(metaData.encoding_stats),
readEncodings(metaData.encodings),
MetadataReader.readStats(Optional.ofNullable(parquetMetadata.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
column.setColumnIndexReference(toColumnIndexReference(columnChunk));
column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
column.setBloomFilterOffset(metaData.bloom_filter_offset);
columnMetadataBuilder.add(column);
}
blocks.add(new BlockMetadata(rowGroup.getNum_rows(), columnMetadataBuilder.build()));
}
}

return blocks;
}

public FileMetadata getFileMetaData()
@VisibleForTesting
public FileMetaData getParquetMetadata()
{
return fileMetaData;
return parquetMetadata;
}

@Override
public String toString()
private static MessageType readParquetSchema(List<SchemaElement> schema)
{
Iterator<SchemaElement> schemaIterator = schema.iterator();
SchemaElement rootSchema = schemaIterator.next();
Types.MessageTypeBuilder builder = Types.buildMessage();
readTypeSchema(builder, schemaIterator, rootSchema.getNum_children());
return builder.named(rootSchema.name);
}

private static void readTypeSchema(Types.GroupBuilder<?> builder, Iterator<SchemaElement> schemaIterator, int typeCount)
{
for (int i = 0; i < typeCount; i++) {
SchemaElement element = schemaIterator.next();
Types.Builder<?, ?> typeBuilder;
if (element.type == null) {
typeBuilder = builder.group(Type.Repetition.valueOf(element.repetition_type.name()));
readTypeSchema((Types.GroupBuilder<?>) typeBuilder, schemaIterator, element.num_children);
}
else {
Types.PrimitiveBuilder<?> primitiveBuilder = builder.primitive(getPrimitive(element.type), Type.Repetition.valueOf(element.repetition_type.name()));
if (element.isSetType_length()) {
primitiveBuilder.length(element.type_length);
}
if (element.isSetPrecision()) {
primitiveBuilder.precision(element.precision);
}
if (element.isSetScale()) {
primitiveBuilder.scale(element.scale);
}
typeBuilder = primitiveBuilder;
}

// Reading of element.logicalType and element.converted_type corresponds to parquet-mr's code at
// https://github.com/apache/parquet-mr/blob/apache-parquet-1.12.0/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L1568-L1582
LogicalTypeAnnotation annotationFromLogicalType = null;
if (element.isSetLogicalType()) {
annotationFromLogicalType = getLogicalTypeAnnotation(element.logicalType);
typeBuilder.as(annotationFromLogicalType);
}
if (element.isSetConverted_type()) {
LogicalTypeAnnotation annotationFromConvertedType = getLogicalTypeAnnotation(element.converted_type, element);
if (annotationFromLogicalType != null) {
// Both element.logicalType and element.converted_type set
if (annotationFromLogicalType.toOriginalType() == annotationFromConvertedType.toOriginalType()) {
// element.converted_type matches element.logicalType, even though annotationFromLogicalType may differ from annotationFromConvertedType
// Following parquet-mr behavior, we favor LogicalTypeAnnotation derived from element.logicalType, as potentially containing more information.
}
else {
// Following parquet-mr behavior, issue warning and let converted_type take precedence.
log.warn("Converted type and logical type metadata map to different OriginalType (convertedType: %s, logical type: %s). Using value in converted type.",
element.converted_type, element.logicalType);
// parquet-mr reads only OriginalType from converted_type. We retain full LogicalTypeAnnotation
// 1. for compatibility, as previous Trino reader code would read LogicalTypeAnnotation from element.converted_type and some additional fields.
// 2. so that we override LogicalTypeAnnotation annotation read from element.logicalType in case of mismatch detected.
typeBuilder.as(annotationFromConvertedType);
}
}
else {
// parquet-mr reads only OriginalType from converted_type. We retain full LogicalTypeAnnotation for compatibility, as previous
// Trino reader code would read LogicalTypeAnnotation from element.converted_type and some additional fields.
typeBuilder.as(annotationFromConvertedType);
}
}

if (element.isSetField_id()) {
typeBuilder.id(element.field_id);
}
typeBuilder.named(element.name.toLowerCase(Locale.ENGLISH));
}
}

private static Set<Encoding> readEncodings(List<org.apache.parquet.format.Encoding> encodings)
{
Set<org.apache.parquet.column.Encoding> columnEncodings = new HashSet<>();
for (org.apache.parquet.format.Encoding encoding : encodings) {
columnEncodings.add(getEncoding(encoding));
}
return Collections.unmodifiableSet(columnEncodings);
}

private static MessageType readMessageType(FileMetaData parquetMetadata, ParquetDataSourceId dataSourceId)
throws ParquetCorruptionException
{
List<SchemaElement> schema = parquetMetadata.getSchema();
validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty");

Iterator<SchemaElement> schemaIterator = schema.iterator();
SchemaElement rootSchema = schemaIterator.next();
Types.MessageTypeBuilder builder = Types.buildMessage();
readTypeSchema(builder, schemaIterator, rootSchema.getNum_children());
return builder.named(rootSchema.name);
}

private static Map<String, String> keyValueMetaData(FileMetaData parquetMetadata)
{
return "ParquetMetaData{" + fileMetaData + ", blocks: " + blocks + "}";
if (parquetMetadata.getKey_value_metadata() == null) {
return ImmutableMap.of();
}
return parquetMetadata.getKey_value_metadata()
.stream()
.collect(toImmutableMap(KeyValue::getKey, KeyValue::getValue, (_, second) -> second));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.metadata.BlockMetadata;
import io.trino.parquet.metadata.ColumnChunkMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.metadata.PrunedBlockMetadata;
import io.trino.parquet.reader.RowGroupInfo;
import io.trino.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -183,7 +184,7 @@ public static List<RowGroupInfo> getFilteredRowGroups(
long splitStart,
long splitLength,
ParquetDataSource dataSource,
List<BlockMetadata> blocksMetaData,
ParquetMetadata parquetMetadata,
List<TupleDomain<ColumnDescriptor>> parquetTupleDomains,
List<TupleDomainParquetPredicate> parquetPredicates,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
Expand All @@ -194,7 +195,7 @@ public static List<RowGroupInfo> getFilteredRowGroups(
{
long fileRowCount = 0;
ImmutableList.Builder<RowGroupInfo> rowGroupInfoBuilder = ImmutableList.builder();
for (BlockMetadata block : blocksMetaData) {
for (BlockMetadata block : parquetMetadata.getBlocks(splitStart, splitLength)) {
long blockStart = block.getStartingPos();
boolean splitContainsBlock = splitStart <= blockStart && blockStart < splitStart + splitLength;
if (splitContainsBlock) {
Expand Down
Loading
Loading