diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java index 13de4ecc01fc..56091d962a08 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java @@ -90,6 +90,12 @@ public String toString() public List getBlocks() throws ParquetCorruptionException + { + return getBlocks(0, Long.MAX_VALUE); + } + + public List getBlocks(long splitStart, long splitLength) + throws ParquetCorruptionException { List schema = parquetMetadata.getSchema(); validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty"); @@ -99,6 +105,14 @@ public List getBlocks() List rowGroups = parquetMetadata.getRow_groups(); if (rowGroups != null) { for (RowGroup rowGroup : rowGroups) { + if (rowGroup.isSetFile_offset()) { + long rowGroupStart = rowGroup.getFile_offset(); + boolean splitContainsRowGroup = splitStart <= rowGroupStart && rowGroupStart < splitStart + splitLength; + if (!splitContainsRowGroup) { + continue; + } + } + List columns = rowGroup.getColumns(); validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup); String filePath = columns.get(0).getFile_path(); diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java index 6901bb23a4e6..978e915a7a56 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java @@ -27,6 +27,7 @@ import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; +import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.metadata.PrunedBlockMetadata; import io.trino.parquet.reader.RowGroupInfo; import io.trino.spi.predicate.TupleDomain; @@ -183,7 +184,7 @@ public static List getFilteredRowGroups( long splitStart, long splitLength, ParquetDataSource dataSource, - List blocksMetaData, + ParquetMetadata parquetMetadata, List> parquetTupleDomains, List parquetPredicates, Map, ColumnDescriptor> descriptorsByPath, @@ -194,7 +195,7 @@ public static List getFilteredRowGroups( { long fileRowCount = 0; ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); - for (BlockMetadata block : blocksMetaData) { + for (BlockMetadata block : parquetMetadata.getBlocks(splitStart, splitLength)) { long blockStart = block.getStartingPos(); boolean splitContainsBlock = splitStart <= blockStart && blockStart < splitStart + splitLength; if (splitContainsBlock) { diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java index aeda237c642f..0528adcd1166 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java @@ -155,7 +155,7 @@ public static ParquetReader createParquetReader( 0, input.getEstimatedSize(), input, - parquetMetadata.getBlocks(), + parquetMetadata, ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath, diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java index 00ecd8388857..db8b4225770a 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java @@ -20,6 +20,7 @@ import io.trino.memory.context.AggregatedMemoryContext; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; +import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.writer.ParquetWriterOptions; import io.trino.spi.Page; @@ -186,6 +187,44 @@ public void testBackwardsCompatibleRepeatedPrimitiveFieldDefinedAsPrimitive() .isInstanceOf(TrinoException.class); } + @Test + void testReadMetadataWithSplitOffset() + throws IOException + { + // Write a file with 100 rows per row-group + List columnNames = ImmutableList.of("columna", "columnb"); + List types = ImmutableList.of(INTEGER, BIGINT); + + ParquetDataSource dataSource = new TestingParquetDataSource( + writeParquetFile( + ParquetWriterOptions.builder() + .setMaxBlockSize(DataSize.ofBytes(1000)) + .build(), + types, + columnNames, + generateInputPages(types, 100, 5)), + new ParquetReaderOptions()); + + // Read both columns, 1 row group + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + List columnBlocks = parquetMetadata.getBlocks(0, 800); + assertThat(columnBlocks.size()).isEqualTo(1); + assertThat(columnBlocks.getFirst().columns().size()).isEqualTo(2); + assertThat(columnBlocks.getFirst().rowCount()).isEqualTo(100); + + // Read both columns, half row groups + parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + columnBlocks = parquetMetadata.getBlocks(0, 2500); + assertThat(columnBlocks.stream().allMatch(block -> block.columns().size() == 2)).isTrue(); + assertThat(columnBlocks.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(300); + + // Read both columns, all row groups + parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + columnBlocks = parquetMetadata.getBlocks(); + assertThat(columnBlocks.stream().allMatch(block -> block.columns().size() == 2)).isTrue(); + assertThat(columnBlocks.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(500); + } + private void testReadingOldParquetFiles(File file, List columnNames, Type columnType, List expectedValues) throws IOException { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index dc072f0967a0..f364a9e18739 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -253,7 +253,7 @@ public static ReaderPageSource createPageSource( start, length, dataSource, - parquetMetadata.getBlocks(), + parquetMetadata, parquetTupleDomains, parquetPredicates, descriptorsByPath, diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index 3bb7b1c42a79..83d62c9ab160 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -218,7 +218,7 @@ private static ConnectorPageSource createPageSource( start, length, dataSource, - parquetMetadata.getBlocks(), + parquetMetadata, ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 4b5abd936d98..1911d5d4426d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -891,7 +891,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( start, length, dataSource, - parquetMetadata.getBlocks(), + parquetMetadata, ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath,