From 55480209ba53248a3d31f632de7b3131c49ccf8c Mon Sep 17 00:00:00 2001 From: Socrates Date: Wed, 15 Jan 2025 15:25:10 +0800 Subject: [PATCH] [cherry-pick](branch-2.1) impl partition pruning in runtime filer #47025 --- be/src/vec/exec/scan/vfile_scanner.cpp | 132 +++++++++++++++++- be/src/vec/exec/scan/vfile_scanner.h | 8 +- ..._hive_runtime_filter_partition_pruning.out | 25 ++++ ...ve_runtime_filter_partition_pruning.groovy | 71 ++++++++++ 4 files changed, 229 insertions(+), 7 deletions(-) create mode 100644 regression-test/data/external_table_p0/hive/test_hive_runtime_filter_partition_pruning.out create mode 100644 regression-test/suites/external_table_p0/hive/test_hive_runtime_filter_partition_pruning.groovy diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 407de80c9fb55e..087a076881356d 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -29,16 +30,19 @@ #include #include #include +#include #include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" #include "common/object_pool.h" +#include "common/status.h" #include "io/cache/block/block_file_cache_profile.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "runtime/types.h" +#include "util/runtime_profile.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" @@ -167,6 +171,10 @@ Status VFileScanner::prepare( _file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT); _has_fully_rf_file_counter = ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT); + _runtime_filter_partition_pruning_timer = ADD_TIMER( + _parent->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime"); + _runtime_filter_partition_pruned_range_counter = ADD_COUNTER( + _parent->scanner_profile(), "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT); } else { _get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime"); _open_reader_timer = @@ -187,6 +195,11 @@ Status VFileScanner::prepare( _file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT); _has_fully_rf_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT); + _runtime_filter_partition_pruning_timer = ADD_TIMER( + _local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime"); + _runtime_filter_partition_pruned_range_counter = + ADD_COUNTER(_local_state->scanner_profile(), "RuntimeFilterPartitionPrunedRangeNum", + TUnit::UNIT); } _file_cache_statistics.reset(new io::FileCacheStatistics()); @@ -226,6 +239,92 @@ Status VFileScanner::prepare( return Status::OK(); } +void VFileScanner::_init_runtime_filter_partition_pruning_ctxs() { + if (_partition_slot_index_map.empty()) { + return; + } + _runtime_filter_partition_pruning_ctxs.clear(); + for (auto& conjunct : _conjuncts) { + auto impl = conjunct->root()->get_impl(); + // If impl is not null, which means this a conjuncts from runtime filter. + auto expr = impl ? impl : conjunct->root(); + if (expr->get_num_children() > 0 && expr->get_child(0)->is_slot_ref()) { + const auto* slot_ref = static_cast(expr->get_child(0).get()); + if (_partition_slot_index_map.find(slot_ref->slot_id()) != + _partition_slot_index_map.end()) { + // If the slot is partition column, add it to runtime filter partition pruning ctxs. + _runtime_filter_partition_pruning_ctxs.emplace_back(conjunct); + } + } + } +} + +Status VFileScanner::_process_runtime_filters_partition_pruning(bool& can_filter_all) { + SCOPED_TIMER(_runtime_filter_partition_pruning_timer); + if (_runtime_filter_partition_pruning_ctxs.empty() || _partition_col_descs.empty()) { + return Status::OK(); + } + size_t partition_value_column_size = 1; + + // 1. Get partition key values to string columns. + std::unordered_map parititon_slot_id_to_column; + for (auto const& partition_col_desc : _partition_col_descs) { + const auto& [partition_value, partition_slot_desc] = partition_col_desc.second; + auto test_serde = partition_slot_desc->get_data_type_ptr()->get_serde(); + auto partition_value_column = partition_slot_desc->get_data_type_ptr()->create_column(); + auto* col_ptr = static_cast(partition_value_column.get()); + Slice slice(partition_value.data(), partition_value.size()); + int num_deserialized = 0; + RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json( + *col_ptr, slice, partition_value_column_size, &num_deserialized, {})); + parititon_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column); + } + + // 2. Build a temp block from the partition column, then execute conjuncts and filter block. + // 2.1 Build a temp block from the partition column to match the conjuncts executing. + Block temp_block; + int index = 0; + bool first_cloumn_filled = false; + for (auto const* slot_desc : _real_tuple_desc->slots()) { + if (!slot_desc->need_materialize()) { + // should be ignored from reading + continue; + } + if (parititon_slot_id_to_column.find(slot_desc->id()) != + parititon_slot_id_to_column.end()) { + auto data_type = slot_desc->get_data_type_ptr(); + auto partition_value_column = std::move(parititon_slot_id_to_column[slot_desc->id()]); + if (data_type->is_nullable()) { + temp_block.insert({ColumnNullable::create( + std::move(partition_value_column), + ColumnUInt8::create(partition_value_column_size, 0)), + data_type, ""}); + } else { + temp_block.insert({std::move(partition_value_column), data_type, ""}); + } + if (index == 0) { + first_cloumn_filled = true; + } + } else { + temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + index++; + } + + // 2.2 Execute conjuncts. + if (!first_cloumn_filled) { + // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 + // The following process may be tricky and time-consuming, but we have no other way. + temp_block.get_by_position(0).column->assume_mutable()->resize(partition_value_column_size); + } + IColumn::Filter result_filter(temp_block.rows(), 1); + RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_pruning_ctxs, nullptr, + &temp_block, &result_filter, &can_filter_all)); + return Status::OK(); +} + Status VFileScanner::_process_conjuncts_for_dict_filter() { _slot_id_to_filter_conjuncts.clear(); _not_single_slot_filter_conjuncts.clear(); @@ -289,6 +388,7 @@ Status VFileScanner::open(RuntimeState* state) { RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range)); if (_first_scan_range) { RETURN_IF_ERROR(_init_expr_ctxes()); + _init_runtime_filter_partition_pruning_ctxs(); } else { // there's no scan range in split source. stop scanner directly. _scanner_eof = true; @@ -771,6 +871,24 @@ Status VFileScanner::_get_next_reader() { const TFileRangeDesc& range = _current_range; _current_range_path = range.path; + if (!_partition_slot_descs.empty()) { + // we need get partition columns first for runtime filter partition pruning + RETURN_IF_ERROR(_generate_parititon_columns()); + if (_push_down_conjuncts.size() < _conjuncts.size()) { + // there are new runtime filters, need to re-init runtime filter partition pruning ctxs + _init_runtime_filter_partition_pruning_ctxs(); + } + + bool can_filter_all = false; + RETURN_IF_ERROR(_process_runtime_filters_partition_pruning(can_filter_all)); + if (can_filter_all) { + // this range can be filtered out by runtime filter partition pruning + // so we need to skip this range + COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1); + continue; + } + } + // create reader for specific format Status init_status; TFileFormatType::type format_type = _params->format_type; @@ -1019,7 +1137,8 @@ Status VFileScanner::_get_next_reader() { _missing_cols.clear(); RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols)); _cur_reader->set_push_down_agg_type(_get_push_down_agg_type()); - RETURN_IF_ERROR(_generate_fill_columns()); + RETURN_IF_ERROR(_generate_missing_columns()); + RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs)); if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { fmt::memory_buffer col_buf; for (auto& col : _missing_cols) { @@ -1049,10 +1168,8 @@ Status VFileScanner::_get_next_reader() { return Status::OK(); } -Status VFileScanner::_generate_fill_columns() { +Status VFileScanner::_generate_parititon_columns() { _partition_col_descs.clear(); - _missing_col_descs.clear(); - const TFileRangeDesc& range = _current_range; if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { for (const auto& slot_desc : _partition_slot_descs) { @@ -1073,7 +1190,11 @@ Status VFileScanner::_generate_fill_columns() { } } } + return Status::OK(); +} +Status VFileScanner::_generate_missing_columns() { + _missing_col_descs.clear(); if (!_missing_cols.empty()) { for (auto slot_desc : _real_tuple_desc->slots()) { if (!slot_desc->is_materialized()) { @@ -1091,8 +1212,7 @@ Status VFileScanner::_generate_fill_columns() { _missing_col_descs.emplace(slot_desc->col_name(), it->second); } } - - return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs); + return Status::OK(); } Status VFileScanner::_init_expr_ctxes() { diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index cf1ea97f21b0f0..d5962d7d167cd2 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -165,6 +165,7 @@ class VFileScanner : public VScanner { Block _src_block; VExprContextSPtrs _push_down_conjuncts; + VExprContextSPtrs _runtime_filter_partition_pruning_ctxs; std::unique_ptr _file_cache_statistics; std::unique_ptr _io_ctx; @@ -181,10 +182,12 @@ class VFileScanner : public VScanner { RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr; RuntimeProfile::Counter* _pre_filter_timer = nullptr; RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr; + RuntimeProfile::Counter* _runtime_filter_partition_pruning_timer = nullptr; RuntimeProfile::Counter* _empty_file_counter = nullptr; RuntimeProfile::Counter* _not_found_file_counter = nullptr; RuntimeProfile::Counter* _file_counter = nullptr; RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr; + RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr; const std::unordered_map* _col_name_to_slot_id = nullptr; // single slot filter conjuncts @@ -212,8 +215,11 @@ class VFileScanner : public VScanner { Status _convert_to_output_block(Block* block); Status _truncate_char_or_varchar_columns(Block* block); void _truncate_char_or_varchar_column(Block* block, int idx, int len); - Status _generate_fill_columns(); Status _handle_dynamic_block(Block* block); + Status _generate_parititon_columns(); + Status _generate_missing_columns(); + void _init_runtime_filter_partition_pruning_ctxs(); + Status _process_runtime_filters_partition_pruning(bool& is_partition_pruning); Status _process_conjuncts_for_dict_filter(); Status _process_late_arrival_conjuncts(); void _get_slot_ids(VExpr* expr, std::vector* slot_ids); diff --git a/regression-test/data/external_table_p0/hive/test_hive_runtime_filter_partition_pruning.out b/regression-test/data/external_table_p0/hive/test_hive_runtime_filter_partition_pruning.out new file mode 100644 index 00000000000000..e14a93ced0308d --- /dev/null +++ b/regression-test/data/external_table_p0/hive/test_hive_runtime_filter_partition_pruning.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !runtime_filter_partition_pruning1 -- +3994 + +-- !runtime_filter_partition_pruning2 -- +4990 + +-- !runtime_filter_partition_pruning3 -- +1999 + +-- !runtime_filter_partition_pruning4 -- +2994 + +-- !runtime_filter_partition_pruning1 -- +3994 + +-- !runtime_filter_partition_pruning2 -- +4990 + +-- !runtime_filter_partition_pruning3 -- +1999 + +-- !runtime_filter_partition_pruning4 -- +2994 + diff --git a/regression-test/suites/external_table_p0/hive/test_hive_runtime_filter_partition_pruning.groovy b/regression-test/suites/external_table_p0/hive/test_hive_runtime_filter_partition_pruning.groovy new file mode 100644 index 00000000000000..5b071195408b29 --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_hive_runtime_filter_partition_pruning.groovy @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_runtime_filter_partition_pruning", "p0,external,hive,external_docker,external_docker_hive") { + def test_runtime_filter_partition_pruning = { + qt_runtime_filter_partition_pruning1 """ + select count(*) from partition_table where nation = + (select nation from partition_table + group by nation having count(*) > 0 + order by nation desc limit 1); + """ + qt_runtime_filter_partition_pruning2 """ + select count(*) from partition_table where nation in + (select nation from partition_table + group by nation having count(*) > 0 + order by nation desc limit 2); + """ + qt_runtime_filter_partition_pruning3 """ + select count(*) from partition_table where city = + (select city from partition_table + group by city having count(*) > 0 + order by city desc limit 1); + """ + qt_runtime_filter_partition_pruning4 """ + select count(*) from partition_table where city in + (select city from partition_table + group by city having count(*) > 0 + order by city desc limit 2); + """ + } + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + + for (String hivePrefix : ["hive2", "hive3"]) { + try { + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String catalog_name = "${hivePrefix}_test_partitions" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + );""" + sql """use `${catalog_name}`.`default`""" + + test_runtime_filter_partition_pruning() + + } finally { + } + } +} +