Is it viable to implement max-partitions-per-scan
option on Iceberg connector, as like Hive connector?
#20714
-
We need to prevent too heavy scan queries on Iceberg format table, as before on Hive format table with In order to implement that option(= So, I think it is impossible to transplant the scan-partitions constraint logic from Hive connector, to Iceberg connector, as it is. But, I guess it may be possible to implement scan-partitions constraint on the But, it seems that people who have a better understanding of Trino and Iceberg than I do, might have had reasons for not implementing this option. @findepi , @wendigo , @raunaqmorarka , @mosabua , @findinpath |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Unlike Hive, in Iceberg there isn't a separate metadata call for fetching partitions of a table that this functionality can be built on top of. Since we enumerate splits lazily and get them through iceberg library, we find out about partitions at execution time as the files are processed. |
Beta Was this translation helpful? Give feedback.
-
I implemented some custom logic on the (TBLPROPERTIES example)
(snippet of custom logics) ...
...
Splitter.on(",").splitToStream(table.getStorageProperties().get(QUERY_PARTITION_FILTER_APPROXIMATE_MAX_SCAN_FIELD_TO_DAYS_MAP_KEY))
.map(String::trim)
.forEach(fieldKeyValuePairStr -> {
String[] fieldKeyValuePair = fieldKeyValuePairStr.split(":");
mandatoryFilterPartitionMaxScanFieldToTsMap.put(fieldKeyValuePair[0].trim(), Integer.parseInt(fieldKeyValuePair[1].trim()) * QUERY_PARTITION_FILTER_TIMESTAMPS_PER_DAY);
...
...
SortedRangeSet currentValueSet = (SortedRangeSet) valueDomain.getValues();
currentValueSet.getRanges().getOrderedRanges().forEach(
range -> {
range.getLowValue().ifPresentOrElse(
lowValue -> {
if (range.isSingleValue()) {
mandatoryFilterPartitionMaxScanFieldToTsMap.put(columnName, mandatoryFilterPartitionMaxScanFieldToTsMap.get(columnName) - QUERY_PARTITION_FILTER_TIMESTAMPS_PER_DAY);
}
else {
Long lowTimestamp = ((LongTimestampWithTimeZone) lowValue).getEpochMillis();
validateMandatoryPartitionScanCountsWithLeftLowValue(lowTimestamp, columnName, range, mandatoryFilterPartitionMaxScanFieldToTsMap, requiredOption, table);
...
... |
Beta Was this translation helpful? Give feedback.
Unlike Hive, in Iceberg there isn't a separate metadata call for fetching partitions of a table that this functionality can be built on top of. Since we enumerate splits lazily and get them through iceberg library, we find out about partitions at execution time as the files are processed.
Just the number of partitions doesn't seem like a great metric for blocking things as it doesn't take into account the amount of data per partition, the number of columns queried and the effect of predicate pushdown in orc/parquet on the worker. It also ignores unpartitioned tables completely. I think
query.max-scan-physical-bytes
is a good substitute here. Maybe we can add an SPI in ConnectorMetadata (D…