Skip to content

Commit

Permalink
[cherry-pick](branch-2.1) impl partition pruning in runtime filer apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
suxiaogang223 authored and morningman committed Jan 20, 2025
1 parent 443e87e commit 5548020
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 7 deletions.
132 changes: 126 additions & 6 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,27 @@
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <glog/logging.h>

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <iterator>
#include <map>
#include <ostream>
#include <tuple>
#include <unordered_map>
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "io/cache/block/block_file_cache_profile.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/runtime_profile.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -167,6 +171,10 @@ Status VFileScanner::prepare(
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
_has_fully_rf_file_counter =
ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT);
_runtime_filter_partition_pruning_timer = ADD_TIMER(
_parent->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime");
_runtime_filter_partition_pruned_range_counter = ADD_COUNTER(
_parent->scanner_profile(), "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT);
} else {
_get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime");
_open_reader_timer =
Expand All @@ -187,6 +195,11 @@ Status VFileScanner::prepare(
_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT);
_has_fully_rf_file_counter =
ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT);
_runtime_filter_partition_pruning_timer = ADD_TIMER(
_local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime");
_runtime_filter_partition_pruned_range_counter =
ADD_COUNTER(_local_state->scanner_profile(), "RuntimeFilterPartitionPrunedRangeNum",
TUnit::UNIT);
}

_file_cache_statistics.reset(new io::FileCacheStatistics());
Expand Down Expand Up @@ -226,6 +239,92 @@ Status VFileScanner::prepare(
return Status::OK();
}

void VFileScanner::_init_runtime_filter_partition_pruning_ctxs() {
if (_partition_slot_index_map.empty()) {
return;
}
_runtime_filter_partition_pruning_ctxs.clear();
for (auto& conjunct : _conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto expr = impl ? impl : conjunct->root();
if (expr->get_num_children() > 0 && expr->get_child(0)->is_slot_ref()) {
const auto* slot_ref = static_cast<const VSlotRef*>(expr->get_child(0).get());
if (_partition_slot_index_map.find(slot_ref->slot_id()) !=
_partition_slot_index_map.end()) {
// If the slot is partition column, add it to runtime filter partition pruning ctxs.
_runtime_filter_partition_pruning_ctxs.emplace_back(conjunct);
}
}
}
}

Status VFileScanner::_process_runtime_filters_partition_pruning(bool& can_filter_all) {
SCOPED_TIMER(_runtime_filter_partition_pruning_timer);
if (_runtime_filter_partition_pruning_ctxs.empty() || _partition_col_descs.empty()) {
return Status::OK();
}
size_t partition_value_column_size = 1;

// 1. Get partition key values to string columns.
std::unordered_map<SlotId, MutableColumnPtr> parititon_slot_id_to_column;
for (auto const& partition_col_desc : _partition_col_descs) {
const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
auto test_serde = partition_slot_desc->get_data_type_ptr()->get_serde();
auto partition_value_column = partition_slot_desc->get_data_type_ptr()->create_column();
auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
Slice slice(partition_value.data(), partition_value.size());
int num_deserialized = 0;
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
*col_ptr, slice, partition_value_column_size, &num_deserialized, {}));
parititon_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
}

// 2. Build a temp block from the partition column, then execute conjuncts and filter block.
// 2.1 Build a temp block from the partition column to match the conjuncts executing.
Block temp_block;
int index = 0;
bool first_cloumn_filled = false;
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->need_materialize()) {
// should be ignored from reading
continue;
}
if (parititon_slot_id_to_column.find(slot_desc->id()) !=
parititon_slot_id_to_column.end()) {
auto data_type = slot_desc->get_data_type_ptr();
auto partition_value_column = std::move(parititon_slot_id_to_column[slot_desc->id()]);
if (data_type->is_nullable()) {
temp_block.insert({ColumnNullable::create(
std::move(partition_value_column),
ColumnUInt8::create(partition_value_column_size, 0)),
data_type, ""});
} else {
temp_block.insert({std::move(partition_value_column), data_type, ""});
}
if (index == 0) {
first_cloumn_filled = true;
}
} else {
temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
index++;
}

// 2.2 Execute conjuncts.
if (!first_cloumn_filled) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
temp_block.get_by_position(0).column->assume_mutable()->resize(partition_value_column_size);
}
IColumn::Filter result_filter(temp_block.rows(), 1);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_pruning_ctxs, nullptr,
&temp_block, &result_filter, &can_filter_all));
return Status::OK();
}

Status VFileScanner::_process_conjuncts_for_dict_filter() {
_slot_id_to_filter_conjuncts.clear();
_not_single_slot_filter_conjuncts.clear();
Expand Down Expand Up @@ -289,6 +388,7 @@ Status VFileScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
if (_first_scan_range) {
RETURN_IF_ERROR(_init_expr_ctxes());
_init_runtime_filter_partition_pruning_ctxs();
} else {
// there's no scan range in split source. stop scanner directly.
_scanner_eof = true;
Expand Down Expand Up @@ -771,6 +871,24 @@ Status VFileScanner::_get_next_reader() {
const TFileRangeDesc& range = _current_range;
_current_range_path = range.path;

if (!_partition_slot_descs.empty()) {
// we need get partition columns first for runtime filter partition pruning
RETURN_IF_ERROR(_generate_parititon_columns());
if (_push_down_conjuncts.size() < _conjuncts.size()) {
// there are new runtime filters, need to re-init runtime filter partition pruning ctxs
_init_runtime_filter_partition_pruning_ctxs();
}

bool can_filter_all = false;
RETURN_IF_ERROR(_process_runtime_filters_partition_pruning(can_filter_all));
if (can_filter_all) {
// this range can be filtered out by runtime filter partition pruning
// so we need to skip this range
COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
continue;
}
}

// create reader for specific format
Status init_status;
TFileFormatType::type format_type = _params->format_type;
Expand Down Expand Up @@ -1019,7 +1137,8 @@ Status VFileScanner::_get_next_reader() {
_missing_cols.clear();
RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
RETURN_IF_ERROR(_generate_fill_columns());
RETURN_IF_ERROR(_generate_missing_columns());
RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
for (auto& col : _missing_cols) {
Expand Down Expand Up @@ -1049,10 +1168,8 @@ Status VFileScanner::_get_next_reader() {
return Status::OK();
}

Status VFileScanner::_generate_fill_columns() {
Status VFileScanner::_generate_parititon_columns() {
_partition_col_descs.clear();
_missing_col_descs.clear();

const TFileRangeDesc& range = _current_range;
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
Expand All @@ -1073,7 +1190,11 @@ Status VFileScanner::_generate_fill_columns() {
}
}
}
return Status::OK();
}

Status VFileScanner::_generate_missing_columns() {
_missing_col_descs.clear();
if (!_missing_cols.empty()) {
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
Expand All @@ -1091,8 +1212,7 @@ Status VFileScanner::_generate_fill_columns() {
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
}
}

return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
return Status::OK();
}

Status VFileScanner::_init_expr_ctxes() {
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class VFileScanner : public VScanner {
Block _src_block;

VExprContextSPtrs _push_down_conjuncts;
VExprContextSPtrs _runtime_filter_partition_pruning_ctxs;

std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::IOContext> _io_ctx;
Expand All @@ -181,10 +182,12 @@ class VFileScanner : public VScanner {
RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_pruning_timer = nullptr;
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _not_found_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr;

const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
// single slot filter conjuncts
Expand Down Expand Up @@ -212,8 +215,11 @@ class VFileScanner : public VScanner {
Status _convert_to_output_block(Block* block);
Status _truncate_char_or_varchar_columns(Block* block);
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
Status _generate_fill_columns();
Status _handle_dynamic_block(Block* block);
Status _generate_parititon_columns();
Status _generate_missing_columns();
void _init_runtime_filter_partition_pruning_ctxs();
Status _process_runtime_filters_partition_pruning(bool& is_partition_pruning);
Status _process_conjuncts_for_dict_filter();
Status _process_late_arrival_conjuncts();
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !runtime_filter_partition_pruning1 --
3994

-- !runtime_filter_partition_pruning2 --
4990

-- !runtime_filter_partition_pruning3 --
1999

-- !runtime_filter_partition_pruning4 --
2994

-- !runtime_filter_partition_pruning1 --
3994

-- !runtime_filter_partition_pruning2 --
4990

-- !runtime_filter_partition_pruning3 --
1999

-- !runtime_filter_partition_pruning4 --
2994

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hive_runtime_filter_partition_pruning", "p0,external,hive,external_docker,external_docker_hive") {
def test_runtime_filter_partition_pruning = {
qt_runtime_filter_partition_pruning1 """
select count(*) from partition_table where nation =
(select nation from partition_table
group by nation having count(*) > 0
order by nation desc limit 1);
"""
qt_runtime_filter_partition_pruning2 """
select count(*) from partition_table where nation in
(select nation from partition_table
group by nation having count(*) > 0
order by nation desc limit 2);
"""
qt_runtime_filter_partition_pruning3 """
select count(*) from partition_table where city =
(select city from partition_table
group by city having count(*) > 0
order by city desc limit 1);
"""
qt_runtime_filter_partition_pruning4 """
select count(*) from partition_table where city in
(select city from partition_table
group by city having count(*) > 0
order by city desc limit 2);
"""
}

String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("diable Hive test.")
return;
}

for (String hivePrefix : ["hive2", "hive3"]) {
try {
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
String catalog_name = "${hivePrefix}_test_partitions"
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")

sql """drop catalog if exists ${catalog_name}"""
sql """create catalog if not exists ${catalog_name} properties (
"type"="hms",
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
);"""
sql """use `${catalog_name}`.`default`"""

test_runtime_filter_partition_pruning()

} finally {
}
}
}

0 comments on commit 5548020

Please sign in to comment.