diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java index 9363add2ca6c..56091d962a08 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java @@ -13,32 +13,252 @@ */ package io.trino.parquet.metadata; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.parquet.ParquetCorruptionException; +import io.trino.parquet.ParquetDataSourceId; +import io.trino.parquet.reader.MetadataReader; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.format.ColumnChunk; +import org.apache.parquet.format.ColumnMetaData; +import org.apache.parquet.format.FileMetaData; +import org.apache.parquet.format.KeyValue; +import org.apache.parquet.format.RowGroup; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.parquet.ParquetMetadataConverter.convertEncodingStats; +import static io.trino.parquet.ParquetMetadataConverter.getEncoding; +import static io.trino.parquet.ParquetMetadataConverter.getLogicalTypeAnnotation; +import static io.trino.parquet.ParquetMetadataConverter.getPrimitive; +import static io.trino.parquet.ParquetMetadataConverter.toColumnIndexReference; +import static io.trino.parquet.ParquetMetadataConverter.toOffsetIndexReference; +import static io.trino.parquet.ParquetValidationUtils.validateParquet; +import static java.util.Objects.requireNonNull; public class ParquetMetadata { - private final FileMetadata fileMetaData; - private final List blocks; + private static final Logger log = Logger.get(ParquetMetadata.class); + + private final FileMetaData parquetMetadata; + private final ParquetDataSourceId dataSourceId; + private final FileMetadata fileMetadata; + + public ParquetMetadata(FileMetaData parquetMetadata, ParquetDataSourceId dataSourceId) + throws ParquetCorruptionException + { + this.fileMetadata = new FileMetadata( + readMessageType(parquetMetadata, dataSourceId), + keyValueMetaData(parquetMetadata), + parquetMetadata.getCreated_by()); + this.parquetMetadata = parquetMetadata; + this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null"); + } + + public FileMetadata getFileMetaData() + { + return fileMetadata; + } - public ParquetMetadata(FileMetadata fileMetaData, List blocks) + @Override + public String toString() { - this.fileMetaData = fileMetaData; - this.blocks = blocks; + return toStringHelper(this) + .add("parquetMetadata", parquetMetadata) + .toString(); } public List getBlocks() + throws ParquetCorruptionException { + return getBlocks(0, Long.MAX_VALUE); + } + + public List getBlocks(long splitStart, long splitLength) + throws ParquetCorruptionException + { + List schema = parquetMetadata.getSchema(); + validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty"); + + MessageType messageType = readParquetSchema(schema); + List blocks = new ArrayList<>(); + List rowGroups = parquetMetadata.getRow_groups(); + if (rowGroups != null) { + for (RowGroup rowGroup : rowGroups) { + if (rowGroup.isSetFile_offset()) { + long rowGroupStart = rowGroup.getFile_offset(); + boolean splitContainsRowGroup = splitStart <= rowGroupStart && rowGroupStart < splitStart + splitLength; + if (!splitContainsRowGroup) { + continue; + } + } + + List columns = rowGroup.getColumns(); + validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup); + String filePath = columns.get(0).getFile_path(); + ImmutableList.Builder columnMetadataBuilder = ImmutableList.builderWithExpectedSize(columns.size()); + for (ColumnChunk columnChunk : columns) { + validateParquet( + (filePath == null && columnChunk.getFile_path() == null) + || (filePath != null && filePath.equals(columnChunk.getFile_path())), + dataSourceId, + "all column chunks of the same row group must be in the same file"); + ColumnMetaData metaData = columnChunk.meta_data; + String[] path = metaData.path_in_schema.stream() + .map(value -> value.toLowerCase(Locale.ENGLISH)) + .toArray(String[]::new); + ColumnPath columnPath = ColumnPath.get(path); + PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType(); + ColumnChunkMetadata column = ColumnChunkMetadata.get( + columnPath, + primitiveType, + CompressionCodecName.fromParquet(metaData.codec), + convertEncodingStats(metaData.encoding_stats), + readEncodings(metaData.encodings), + MetadataReader.readStats(Optional.ofNullable(parquetMetadata.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType), + metaData.data_page_offset, + metaData.dictionary_page_offset, + metaData.num_values, + metaData.total_compressed_size, + metaData.total_uncompressed_size); + column.setColumnIndexReference(toColumnIndexReference(columnChunk)); + column.setOffsetIndexReference(toOffsetIndexReference(columnChunk)); + column.setBloomFilterOffset(metaData.bloom_filter_offset); + columnMetadataBuilder.add(column); + } + blocks.add(new BlockMetadata(rowGroup.getNum_rows(), columnMetadataBuilder.build())); + } + } + return blocks; } - public FileMetadata getFileMetaData() + @VisibleForTesting + public FileMetaData getParquetMetadata() { - return fileMetaData; + return parquetMetadata; } - @Override - public String toString() + private static MessageType readParquetSchema(List schema) + { + Iterator schemaIterator = schema.iterator(); + SchemaElement rootSchema = schemaIterator.next(); + Types.MessageTypeBuilder builder = Types.buildMessage(); + readTypeSchema(builder, schemaIterator, rootSchema.getNum_children()); + return builder.named(rootSchema.name); + } + + private static void readTypeSchema(Types.GroupBuilder builder, Iterator schemaIterator, int typeCount) + { + for (int i = 0; i < typeCount; i++) { + SchemaElement element = schemaIterator.next(); + Types.Builder typeBuilder; + if (element.type == null) { + typeBuilder = builder.group(Type.Repetition.valueOf(element.repetition_type.name())); + readTypeSchema((Types.GroupBuilder) typeBuilder, schemaIterator, element.num_children); + } + else { + Types.PrimitiveBuilder primitiveBuilder = builder.primitive(getPrimitive(element.type), Type.Repetition.valueOf(element.repetition_type.name())); + if (element.isSetType_length()) { + primitiveBuilder.length(element.type_length); + } + if (element.isSetPrecision()) { + primitiveBuilder.precision(element.precision); + } + if (element.isSetScale()) { + primitiveBuilder.scale(element.scale); + } + typeBuilder = primitiveBuilder; + } + + // Reading of element.logicalType and element.converted_type corresponds to parquet-mr's code at + // https://github.com/apache/parquet-mr/blob/apache-parquet-1.12.0/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L1568-L1582 + LogicalTypeAnnotation annotationFromLogicalType = null; + if (element.isSetLogicalType()) { + annotationFromLogicalType = getLogicalTypeAnnotation(element.logicalType); + typeBuilder.as(annotationFromLogicalType); + } + if (element.isSetConverted_type()) { + LogicalTypeAnnotation annotationFromConvertedType = getLogicalTypeAnnotation(element.converted_type, element); + if (annotationFromLogicalType != null) { + // Both element.logicalType and element.converted_type set + if (annotationFromLogicalType.toOriginalType() == annotationFromConvertedType.toOriginalType()) { + // element.converted_type matches element.logicalType, even though annotationFromLogicalType may differ from annotationFromConvertedType + // Following parquet-mr behavior, we favor LogicalTypeAnnotation derived from element.logicalType, as potentially containing more information. + } + else { + // Following parquet-mr behavior, issue warning and let converted_type take precedence. + log.warn("Converted type and logical type metadata map to different OriginalType (convertedType: %s, logical type: %s). Using value in converted type.", + element.converted_type, element.logicalType); + // parquet-mr reads only OriginalType from converted_type. We retain full LogicalTypeAnnotation + // 1. for compatibility, as previous Trino reader code would read LogicalTypeAnnotation from element.converted_type and some additional fields. + // 2. so that we override LogicalTypeAnnotation annotation read from element.logicalType in case of mismatch detected. + typeBuilder.as(annotationFromConvertedType); + } + } + else { + // parquet-mr reads only OriginalType from converted_type. We retain full LogicalTypeAnnotation for compatibility, as previous + // Trino reader code would read LogicalTypeAnnotation from element.converted_type and some additional fields. + typeBuilder.as(annotationFromConvertedType); + } + } + + if (element.isSetField_id()) { + typeBuilder.id(element.field_id); + } + typeBuilder.named(element.name.toLowerCase(Locale.ENGLISH)); + } + } + + private static Set readEncodings(List encodings) + { + Set columnEncodings = new HashSet<>(); + for (org.apache.parquet.format.Encoding encoding : encodings) { + columnEncodings.add(getEncoding(encoding)); + } + return Collections.unmodifiableSet(columnEncodings); + } + + private static MessageType readMessageType(FileMetaData parquetMetadata, ParquetDataSourceId dataSourceId) + throws ParquetCorruptionException + { + List schema = parquetMetadata.getSchema(); + validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty"); + + Iterator schemaIterator = schema.iterator(); + SchemaElement rootSchema = schemaIterator.next(); + Types.MessageTypeBuilder builder = Types.buildMessage(); + readTypeSchema(builder, schemaIterator, rootSchema.getNum_children()); + return builder.named(rootSchema.name); + } + + private static Map keyValueMetaData(FileMetaData parquetMetadata) { - return "ParquetMetaData{" + fileMetaData + ", blocks: " + blocks + "}"; + if (parquetMetadata.getKey_value_metadata() == null) { + return ImmutableMap.of(); + } + return parquetMetadata.getKey_value_metadata() + .stream() + .collect(toImmutableMap(KeyValue::getKey, KeyValue::getValue, (_, second) -> second)); } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java index 6901bb23a4e6..978e915a7a56 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java @@ -27,6 +27,7 @@ import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; +import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.metadata.PrunedBlockMetadata; import io.trino.parquet.reader.RowGroupInfo; import io.trino.spi.predicate.TupleDomain; @@ -183,7 +184,7 @@ public static List getFilteredRowGroups( long splitStart, long splitLength, ParquetDataSource dataSource, - List blocksMetaData, + ParquetMetadata parquetMetadata, List> parquetTupleDomains, List parquetPredicates, Map, ColumnDescriptor> descriptorsByPath, @@ -194,7 +195,7 @@ public static List getFilteredRowGroups( { long fileRowCount = 0; ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); - for (BlockMetadata block : blocksMetaData) { + for (BlockMetadata block : parquetMetadata.getBlocks(splitStart, splitLength)) { long blockStart = block.getStartingPos(); boolean splitContainsBlock = splitStart <= blockStart && blockStart < splitStart + splitLength; if (splitContainsBlock) { diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java index fe0635646f98..e543841f20eb 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java @@ -13,57 +13,27 @@ */ package io.trino.parquet.reader; -import com.google.common.collect.ImmutableList; -import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.ParquetWriteValidation; -import io.trino.parquet.metadata.BlockMetadata; -import io.trino.parquet.metadata.ColumnChunkMetadata; import io.trino.parquet.metadata.FileMetadata; import io.trino.parquet.metadata.ParquetMetadata; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.column.statistics.BinaryStatistics; -import org.apache.parquet.format.ColumnChunk; -import org.apache.parquet.format.ColumnMetaData; -import org.apache.parquet.format.Encoding; import org.apache.parquet.format.FileMetaData; -import org.apache.parquet.format.KeyValue; -import org.apache.parquet.format.RowGroup; -import org.apache.parquet.format.SchemaElement; import org.apache.parquet.format.Statistics; -import org.apache.parquet.hadoop.metadata.ColumnPath; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type.Repetition; -import org.apache.parquet.schema.Types; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.Map; import java.util.Optional; -import java.util.Set; -import static io.trino.parquet.ParquetMetadataConverter.convertEncodingStats; import static io.trino.parquet.ParquetMetadataConverter.fromParquetStatistics; -import static io.trino.parquet.ParquetMetadataConverter.getEncoding; -import static io.trino.parquet.ParquetMetadataConverter.getLogicalTypeAnnotation; -import static io.trino.parquet.ParquetMetadataConverter.getPrimitive; -import static io.trino.parquet.ParquetMetadataConverter.toColumnIndexReference; -import static io.trino.parquet.ParquetMetadataConverter.toOffsetIndexReference; import static io.trino.parquet.ParquetValidationUtils.validateParquet; import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; @@ -73,8 +43,6 @@ public final class MetadataReader { - private static final Logger log = Logger.get(MetadataReader.class); - private static final Slice MAGIC = Slices.utf8Slice("PAR1"); private static final int POST_SCRIPT_SIZE = Integer.BYTES + MAGIC.length(); // Typical 1GB files produced by Trino were found to have footer size between 30-40KB @@ -119,144 +87,11 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional< InputStream metadataStream = buffer.slice(buffer.length() - completeFooterSize, metadataLength).getInput(); FileMetaData fileMetaData = readFileMetaData(metadataStream); - ParquetMetadata parquetMetadata = createParquetMetadata(fileMetaData, dataSource.getId()); + ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, dataSource.getId()); validateFileMetadata(dataSource.getId(), parquetMetadata.getFileMetaData(), parquetWriteValidation); return parquetMetadata; } - public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId) - throws ParquetCorruptionException - { - List schema = fileMetaData.getSchema(); - validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty"); - - MessageType messageType = readParquetSchema(schema); - List blocks = new ArrayList<>(); - List rowGroups = fileMetaData.getRow_groups(); - if (rowGroups != null) { - for (RowGroup rowGroup : rowGroups) { - List columns = rowGroup.getColumns(); - validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup); - String filePath = columns.get(0).getFile_path(); - ImmutableList.Builder columnMetadataBuilder = ImmutableList.builderWithExpectedSize(columns.size()); - for (ColumnChunk columnChunk : columns) { - validateParquet( - (filePath == null && columnChunk.getFile_path() == null) - || (filePath != null && filePath.equals(columnChunk.getFile_path())), - dataSourceId, - "all column chunks of the same row group must be in the same file"); - ColumnMetaData metaData = columnChunk.meta_data; - String[] path = metaData.path_in_schema.stream() - .map(value -> value.toLowerCase(Locale.ENGLISH)) - .toArray(String[]::new); - ColumnPath columnPath = ColumnPath.get(path); - PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType(); - ColumnChunkMetadata column = ColumnChunkMetadata.get( - columnPath, - primitiveType, - CompressionCodecName.fromParquet(metaData.codec), - convertEncodingStats(metaData.encoding_stats), - readEncodings(metaData.encodings), - readStats(Optional.ofNullable(fileMetaData.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType), - metaData.data_page_offset, - metaData.dictionary_page_offset, - metaData.num_values, - metaData.total_compressed_size, - metaData.total_uncompressed_size); - column.setColumnIndexReference(toColumnIndexReference(columnChunk)); - column.setOffsetIndexReference(toOffsetIndexReference(columnChunk)); - column.setBloomFilterOffset(metaData.bloom_filter_offset); - columnMetadataBuilder.add(column); - } - blocks.add(new BlockMetadata(rowGroup.getNum_rows(), columnMetadataBuilder.build())); - } - } - - Map keyValueMetaData = new HashMap<>(); - List keyValueList = fileMetaData.getKey_value_metadata(); - if (keyValueList != null) { - for (KeyValue keyValue : keyValueList) { - keyValueMetaData.put(keyValue.key, keyValue.value); - } - } - FileMetadata parquetFileMetadata = new FileMetadata( - messageType, - keyValueMetaData, - fileMetaData.getCreated_by()); - return new ParquetMetadata(parquetFileMetadata, blocks); - } - - private static MessageType readParquetSchema(List schema) - { - Iterator schemaIterator = schema.iterator(); - SchemaElement rootSchema = schemaIterator.next(); - Types.MessageTypeBuilder builder = Types.buildMessage(); - readTypeSchema(builder, schemaIterator, rootSchema.getNum_children()); - return builder.named(rootSchema.name); - } - - private static void readTypeSchema(Types.GroupBuilder builder, Iterator schemaIterator, int typeCount) - { - for (int i = 0; i < typeCount; i++) { - SchemaElement element = schemaIterator.next(); - Types.Builder typeBuilder; - if (element.type == null) { - typeBuilder = builder.group(Repetition.valueOf(element.repetition_type.name())); - readTypeSchema((Types.GroupBuilder) typeBuilder, schemaIterator, element.num_children); - } - else { - Types.PrimitiveBuilder primitiveBuilder = builder.primitive(getPrimitive(element.type), Repetition.valueOf(element.repetition_type.name())); - if (element.isSetType_length()) { - primitiveBuilder.length(element.type_length); - } - if (element.isSetPrecision()) { - primitiveBuilder.precision(element.precision); - } - if (element.isSetScale()) { - primitiveBuilder.scale(element.scale); - } - typeBuilder = primitiveBuilder; - } - - // Reading of element.logicalType and element.converted_type corresponds to parquet-mr's code at - // https://github.com/apache/parquet-mr/blob/apache-parquet-1.12.0/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L1568-L1582 - LogicalTypeAnnotation annotationFromLogicalType = null; - if (element.isSetLogicalType()) { - annotationFromLogicalType = getLogicalTypeAnnotation(element.logicalType); - typeBuilder.as(annotationFromLogicalType); - } - if (element.isSetConverted_type()) { - LogicalTypeAnnotation annotationFromConvertedType = getLogicalTypeAnnotation(element.converted_type, element); - if (annotationFromLogicalType != null) { - // Both element.logicalType and element.converted_type set - if (annotationFromLogicalType.toOriginalType() == annotationFromConvertedType.toOriginalType()) { - // element.converted_type matches element.logicalType, even though annotationFromLogicalType may differ from annotationFromConvertedType - // Following parquet-mr behavior, we favor LogicalTypeAnnotation derived from element.logicalType, as potentially containing more information. - } - else { - // Following parquet-mr behavior, issue warning and let converted_type take precedence. - log.warn("Converted type and logical type metadata map to different OriginalType (convertedType: %s, logical type: %s). Using value in converted type.", - element.converted_type, element.logicalType); - // parquet-mr reads only OriginalType from converted_type. We retain full LogicalTypeAnnotation - // 1. for compatibility, as previous Trino reader code would read LogicalTypeAnnotation from element.converted_type and some additional fields. - // 2. so that we override LogicalTypeAnnotation annotation read from element.logicalType in case of mismatch detected. - typeBuilder.as(annotationFromConvertedType); - } - } - else { - // parquet-mr reads only OriginalType from converted_type. We retain full LogicalTypeAnnotation for compatibility, as previous - // Trino reader code would read LogicalTypeAnnotation from element.converted_type and some additional fields. - typeBuilder.as(annotationFromConvertedType); - } - } - - if (element.isSetField_id()) { - typeBuilder.id(element.field_id); - } - typeBuilder.named(element.name.toLowerCase(Locale.ENGLISH)); - } - } - public static org.apache.parquet.column.statistics.Statistics readStats(Optional fileCreatedBy, Optional statisticsFromFile, PrimitiveType type) { Statistics statistics = statisticsFromFile.orElse(null); @@ -352,15 +187,6 @@ private static int commonPrefix(byte[] a, byte[] b) return commonPrefixLength; } - private static Set readEncodings(List encodings) - { - Set columnEncodings = new HashSet<>(); - for (Encoding encoding : encodings) { - columnEncodings.add(getEncoding(encoding)); - } - return Collections.unmodifiableSet(columnEncodings); - } - private static void validateFileMetadata(ParquetDataSourceId dataSourceId, FileMetadata fileMetaData, Optional parquetWriteValidation) throws ParquetCorruptionException { diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java index 1eed8ba73ef1..3bf3ab7010df 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java @@ -350,7 +350,7 @@ private void flush() columnMetaDataBuilder.add(columnMetaData); currentOffset += columnMetaData.getTotal_compressed_size(); } - updateRowGroups(columnMetaDataBuilder.build()); + updateRowGroups(columnMetaDataBuilder.build(), outputStream.longSize()); // flush pages for (BufferData bufferData : bufferDataList) { @@ -409,12 +409,14 @@ private void writeBloomFilters(List rowGroups, List columnMetaData) + private void updateRowGroups(List columnMetaData, long fileOffset) { long totalCompressedBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_compressed_size).sum(); long totalBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_uncompressed_size).sum(); ImmutableList columnChunks = columnMetaData.stream().map(ParquetWriter::toColumnChunk).collect(toImmutableList()); - fileFooter.addRowGroup(new RowGroup(columnChunks, totalBytes, rows).setTotal_compressed_size(totalCompressedBytes)); + fileFooter.addRowGroup(new RowGroup(columnChunks, totalBytes, rows) + .setTotal_compressed_size(totalCompressedBytes) + .setFile_offset(fileOffset)); } private static Slice serializeFooter(FileMetaData fileMetaData) diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java index aeda237c642f..0528adcd1166 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java @@ -155,7 +155,7 @@ public static ParquetReader createParquetReader( 0, input.getEstimatedSize(), input, - parquetMetadata.getBlocks(), + parquetMetadata, ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath, diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java index 00ecd8388857..db8b4225770a 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java @@ -20,6 +20,7 @@ import io.trino.memory.context.AggregatedMemoryContext; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; +import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.writer.ParquetWriterOptions; import io.trino.spi.Page; @@ -186,6 +187,44 @@ public void testBackwardsCompatibleRepeatedPrimitiveFieldDefinedAsPrimitive() .isInstanceOf(TrinoException.class); } + @Test + void testReadMetadataWithSplitOffset() + throws IOException + { + // Write a file with 100 rows per row-group + List columnNames = ImmutableList.of("columna", "columnb"); + List types = ImmutableList.of(INTEGER, BIGINT); + + ParquetDataSource dataSource = new TestingParquetDataSource( + writeParquetFile( + ParquetWriterOptions.builder() + .setMaxBlockSize(DataSize.ofBytes(1000)) + .build(), + types, + columnNames, + generateInputPages(types, 100, 5)), + new ParquetReaderOptions()); + + // Read both columns, 1 row group + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + List columnBlocks = parquetMetadata.getBlocks(0, 800); + assertThat(columnBlocks.size()).isEqualTo(1); + assertThat(columnBlocks.getFirst().columns().size()).isEqualTo(2); + assertThat(columnBlocks.getFirst().rowCount()).isEqualTo(100); + + // Read both columns, half row groups + parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + columnBlocks = parquetMetadata.getBlocks(0, 2500); + assertThat(columnBlocks.stream().allMatch(block -> block.columns().size() == 2)).isTrue(); + assertThat(columnBlocks.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(300); + + // Read both columns, all row groups + parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + columnBlocks = parquetMetadata.getBlocks(); + assertThat(columnBlocks.stream().allMatch(block -> block.columns().size() == 2)).isTrue(); + assertThat(columnBlocks.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(500); + } + private void testReadingOldParquetFiles(File file, List columnNames, Type columnType, List expectedValues) throws IOException { diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java index a80cbbcd00d7..2d3b7f6c1ac6 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java @@ -46,6 +46,7 @@ import org.apache.parquet.format.CompressionCodec; import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.PageType; +import org.apache.parquet.format.RowGroup; import org.apache.parquet.format.Util; import org.apache.parquet.schema.PrimitiveType; import org.assertj.core.data.Percentage; @@ -379,6 +380,38 @@ public void testDictionaryPageOffset() } } + @Test + void testRowGroupOffset() + throws IOException + { + // Write a file with 100 rows per row-group + List columnNames = ImmutableList.of("columnA", "columnB"); + List types = ImmutableList.of(INTEGER, BIGINT); + + ParquetDataSource dataSource = new TestingParquetDataSource( + writeParquetFile( + ParquetWriterOptions.builder() + .setMaxBlockSize(DataSize.ofBytes(1000)) + .build(), + types, + columnNames, + generateInputPages(types, 100, 10)), + new ParquetReaderOptions()); + + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + List blocks = parquetMetadata.getBlocks(); + assertThat(blocks.size()).isGreaterThan(1); + + List rowGroups = parquetMetadata.getParquetMetadata().getRow_groups(); + assertThat(rowGroups.size()).isEqualTo(blocks.size()); + for (int rowGroupIndex = 0; rowGroupIndex < rowGroups.size(); rowGroupIndex++) { + RowGroup rowGroup = rowGroups.get(rowGroupIndex); + assertThat(rowGroup.isSetFile_offset()).isTrue(); + BlockMetadata blockMetadata = blocks.get(rowGroupIndex); + assertThat(blockMetadata.getStartingPos()).isEqualTo(rowGroup.getFile_offset()); + } + } + @ParameterizedTest @MethodSource("testWriteBloomFiltersParams") public void testWriteBloomFilters(Type type, List data) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java index 8f686205e239..c247261db38d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java @@ -22,7 +22,6 @@ import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; import io.trino.parquet.metadata.ParquetMetadata; -import io.trino.parquet.reader.MetadataReader; import io.trino.plugin.deltalake.DataFileInfo.DataFileType; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics; import io.trino.plugin.hive.FileWriter; @@ -184,7 +183,7 @@ public DataFileInfo getDataFileInfo() { Location path = rootTableLocation.appendPath(relativeFilePath); FileMetaData fileMetaData = fileWriter.getFileMetadata(); - ParquetMetadata parquetMetadata = MetadataReader.createParquetMetadata(fileMetaData, new ParquetDataSourceId(path.toString())); + ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, new ParquetDataSourceId(path.toString())); return new DataFileInfo( relativeFilePath, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java index d7e92d3a063e..dc9f72d210e6 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java @@ -95,12 +95,12 @@ public void testCacheFileOperations() .add(new CacheOperation("Alluxio.writeCache", "00000000000000000002.json", 0, 658)) .add(new CacheOperation("InputFile.length", "00000000000000000003.json")) .add(new CacheOperation("InputFile.newStream", "_last_checkpoint")) - .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227)) - .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227)) - .add(new CacheOperation("Input.readFully", "key=p1/", 0, 227)) - .add(new CacheOperation("Input.readFully", "key=p2/", 0, 227)) - .add(new CacheOperation("Alluxio.writeCache", "key=p1/", 0, 227)) - .add(new CacheOperation("Alluxio.writeCache", "key=p2/", 0, 227)) + .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 229)) + .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 229)) + .add(new CacheOperation("Input.readFully", "key=p1/", 0, 229)) + .add(new CacheOperation("Input.readFully", "key=p2/", 0, 229)) + .add(new CacheOperation("Alluxio.writeCache", "key=p1/", 0, 229)) + .add(new CacheOperation("Alluxio.writeCache", "key=p2/", 0, 229)) .build()); assertFileSystemAccesses( "SELECT * FROM test_cache_file_operations", @@ -113,8 +113,8 @@ public void testCacheFileOperations() .add(new CacheOperation("InputFile.length", "00000000000000000002.json")) .add(new CacheOperation("InputFile.length", "00000000000000000003.json")) .add(new CacheOperation("InputFile.newStream", "_last_checkpoint")) - .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227)) - .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227)) + .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 229)) + .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 229)) .build()); assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p3', '3-xyz')", 1); assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p4', '4-xyz')", 1); @@ -139,17 +139,17 @@ public void testCacheFileOperations() .add(new CacheOperation("InputFile.length", "00000000000000000005.json")) .add(new CacheOperation("InputFile.length", "00000000000000000006.json")) .add(new CacheOperation("InputFile.newStream", "_last_checkpoint")) - .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227)) - .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227)) - .add(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 227)) - .add(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 227)) - .add(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 227)) - .add(new CacheOperation("Input.readFully", "key=p3/", 0, 227)) - .add(new CacheOperation("Input.readFully", "key=p4/", 0, 227)) - .add(new CacheOperation("Input.readFully", "key=p5/", 0, 227)) - .add(new CacheOperation("Alluxio.writeCache", "key=p3/", 0, 227)) - .add(new CacheOperation("Alluxio.writeCache", "key=p4/", 0, 227)) - .add(new CacheOperation("Alluxio.writeCache", "key=p5/", 0, 227)) + .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 229)) + .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 229)) + .add(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 229)) + .add(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 229)) + .add(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 229)) + .add(new CacheOperation("Input.readFully", "key=p3/", 0, 229)) + .add(new CacheOperation("Input.readFully", "key=p4/", 0, 229)) + .add(new CacheOperation("Input.readFully", "key=p5/", 0, 229)) + .add(new CacheOperation("Alluxio.writeCache", "key=p3/", 0, 229)) + .add(new CacheOperation("Alluxio.writeCache", "key=p4/", 0, 229)) + .add(new CacheOperation("Alluxio.writeCache", "key=p5/", 0, 229)) .build()); assertFileSystemAccesses( "SELECT * FROM test_cache_file_operations", @@ -168,11 +168,11 @@ public void testCacheFileOperations() .add(new CacheOperation("InputFile.length", "00000000000000000005.json")) .add(new CacheOperation("InputFile.length", "00000000000000000006.json")) .add(new CacheOperation("InputFile.newStream", "_last_checkpoint")) - .addCopies(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227), 1) - .addCopies(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227), 1) - .addCopies(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 227), 1) - .addCopies(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 227), 1) - .addCopies(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 227), 1) + .addCopies(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 229), 1) + .addCopies(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 229), 1) + .addCopies(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 229), 1) + .addCopies(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 229), 1) + .addCopies(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 229), 1) .build()); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java index 04b3cbfba622..a69d2df8a19d 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java @@ -79,12 +79,12 @@ public void testTableDataCachedWhileTransactionLogNotCached() .addCopies(new CacheFileSystemTraceUtils.CacheOperation("Input.readTail", "00000000000000000002.checkpoint.parquet"), 2) .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000003.json")) .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "_last_checkpoint")) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 227)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 227)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p1/", 0, 227)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p2/", 0, 227)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p1/", 0, 227)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p2/", 0, 227)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 229)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 229)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p1/", 0, 229)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p2/", 0, 229)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p1/", 0, 229)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p2/", 0, 229)) .build()); assertFileSystemAccesses( "SELECT * FROM test_transaction_log_not_cached", @@ -93,8 +93,8 @@ public void testTableDataCachedWhileTransactionLogNotCached() .addCopies(new CacheFileSystemTraceUtils.CacheOperation("Input.readTail", "00000000000000000002.checkpoint.parquet"), 2) .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000003.json")) .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "_last_checkpoint")) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 227)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 227)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 229)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 229)) .build()); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index dc072f0967a0..f364a9e18739 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -253,7 +253,7 @@ public static ReaderPageSource createPageSource( start, length, dataSource, - parquetMetadata.getBlocks(), + parquetMetadata, parquetTupleDomains, parquetPredicates, descriptorsByPath, diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index 3bb7b1c42a79..83d62c9ab160 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -218,7 +218,7 @@ private static ConnectorPageSource createPageSource( start, length, dataSource, - parquetMetadata.getBlocks(), + parquetMetadata, ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 4b5abd936d98..1911d5d4426d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -891,7 +891,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( start, length, dataSource, - parquetMetadata.getBlocks(), + parquetMetadata, ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java index 7f0716b66188..06cccccebe3c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java @@ -33,7 +33,6 @@ import java.util.Optional; import java.util.stream.Stream; -import static io.trino.parquet.reader.MetadataReader.createParquetMetadata; import static io.trino.plugin.iceberg.util.ParquetUtil.footerMetrics; import static io.trino.plugin.iceberg.util.ParquetUtil.getSplitOffsets; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; @@ -83,12 +82,12 @@ public FileMetrics getFileMetrics() { ParquetMetadata parquetMetadata; try { - parquetMetadata = createParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString())); + parquetMetadata = new ParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString())); + return new FileMetrics(footerMetrics(parquetMetadata, Stream.empty(), metricsConfig), Optional.of(getSplitOffsets(parquetMetadata))); } catch (IOException e) { throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Error creating metadata for Parquet file %s", location), e); } - return new FileMetrics(footerMetrics(parquetMetadata, Stream.empty(), metricsConfig), Optional.of(getSplitOffsets(parquetMetadata))); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java index 0a676ca339ca..98f50940b419 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java @@ -15,6 +15,7 @@ package io.trino.plugin.iceberg.util; import com.google.common.collect.ImmutableList; +import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; import io.trino.parquet.metadata.ParquetMetadata; @@ -69,6 +70,7 @@ public final class ParquetUtil private ParquetUtil() {} public static Metrics footerMetrics(ParquetMetadata metadata, Stream> fieldMetrics, MetricsConfig metricsConfig) + throws ParquetCorruptionException { return footerMetrics(metadata, fieldMetrics, metricsConfig, null); } @@ -78,6 +80,7 @@ public static Metrics footerMetrics( Stream> fieldMetrics, MetricsConfig metricsConfig, NameMapping nameMapping) + throws ParquetCorruptionException { requireNonNull(fieldMetrics, "fieldMetrics should not be null"); @@ -156,9 +159,11 @@ public static Metrics footerMetrics( } public static List getSplitOffsets(ParquetMetadata metadata) + throws ParquetCorruptionException { - List splitOffsets = new ArrayList<>(metadata.getBlocks().size()); - for (BlockMetadata blockMetaData : metadata.getBlocks()) { + List blocks = metadata.getBlocks(); + List splitOffsets = new ArrayList<>(blocks.size()); + for (BlockMetadata blockMetaData : blocks) { splitOffsets.add(blockMetaData.getStartingPos()); } Collections.sort(splitOffsets); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java index 17d8695f61f5..35f514551533 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java @@ -139,18 +139,20 @@ private static boolean checkOrcFileSorting(Supplier dataSourceSup public static boolean checkParquetFileSorting(TrinoInputFile inputFile, String sortColumnName) { ParquetMetadata parquetMetadata; + List blocks; try { parquetMetadata = MetadataReader.readFooter( new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()), Optional.empty()); + blocks = parquetMetadata.getBlocks(); } catch (IOException e) { throw new UncheckedIOException(e); } Comparable previousMax = null; - verify(parquetMetadata.getBlocks().size() > 1, "Test must produce at least two row groups"); - for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) { + verify(blocks.size() > 1, "Test must produce at least two row groups"); + for (BlockMetadata blockMetaData : blocks) { ColumnChunkMetadata columnMetadata = blockMetaData.columns().stream() .filter(column -> getOnlyElement(column.getPath().iterator()).equalsIgnoreCase(sortColumnName)) .collect(onlyElement());