Skip to content

Commit

Permalink
bug修复及功能支持 (#247)
Browse files Browse the repository at this point in the history
* feat: force index ignore not ready index

* feat: sort use index add pk fields

* fix: check_condition_again_for_global_index

* fix: 0x00 literal string bugs

* feat: join reorder for child

* fix: sub query use force index

* fix: prepare for insert ... subquery

* fix: core dump when analyze empty table

* fix: modify column bug

* feat: information_schema.TABLES show comment

* feat: keep ttl for update

* feat: show_cost don't query store

* feat: reject modify data type when change is lossy

* fix: set_user_variable

* fix: limit下推bug

* fix: instance dead choose other peer

* fix: core dump when restart meta

* fix: limit should >= 0

* fix: global update use row ttl

---------

Co-authored-by: 王勇 <[email protected]>
  • Loading branch information
cyz-2023 and wy1433 authored Nov 18, 2024
1 parent 551f07d commit 5638a1c
Show file tree
Hide file tree
Showing 29 changed files with 203 additions and 93 deletions.
1 change: 1 addition & 0 deletions include/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ void stripslashes(std::string& str, bool is_gbk);
extern void update_schema_conf_common(const std::string& table_name, const pb::SchemaConf& schema_conf, pb::SchemaConf* p_conf);
extern void update_op_version(pb::SchemaConf* p_conf, const std::string& desc);
extern int primitive_to_proto_type(pb::PrimitiveType type);
extern int primitive_type_bytes_len(pb::PrimitiveType type);
extern int get_physical_room(const std::string& ip_and_port_str, std::string& host);
extern int get_instance_from_bns(int* ret,
const std::string& bns_name,
Expand Down
4 changes: 3 additions & 1 deletion include/common/histogram.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ class PacketSample {
sample_sorter.insert_row(batch->get_row().get());
}
batch->reset();
_batch_vector.push_back(batch);
if (batch->size() > 0) {
_batch_vector.push_back(batch);
}
} while (!eos);

sample_sorter.insert_done();
Expand Down
2 changes: 1 addition & 1 deletion include/exec/dml_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class DMLNode : public ExecNode {
virtual void find_place_holder(std::unordered_multimap<int, ExprNode*>& placeholders);
int insert_row(RuntimeState* state, SmartRecord record, bool is_update = false);
int delete_row(RuntimeState* state, SmartRecord record, MemRow* row);
int get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row);
int get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row, int64_t& ttl_ts);
int remove_row(RuntimeState* state, SmartRecord record,
const std::string& pk_str, bool delete_primary = true);
int update_row(RuntimeState* state, SmartRecord record, MemRow* row);
Expand Down
3 changes: 3 additions & 0 deletions include/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ class ExecNode {
}
void set_limit(int64_t limit) {
_limit = limit;
if (_limit < 0) {
_limit = -1;
}
}
virtual void reset_limit(int64_t limit) {
_limit = limit;
Expand Down
9 changes: 9 additions & 0 deletions include/exec/filter_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ class FilterNode : public ExecNode {
}
}

bool is_empty_filter() {
//只有scan_node做索引条件的时候会用_pruned_conjuncts
if (_children.size() > 0 && _children[0]->node_type() == pb::SCAN_NODE) {
return _pruned_conjuncts.empty();
} else {
return _conjuncts.empty();
}
}

private:
bool need_copy(MemRow* row);
private:
Expand Down
1 change: 1 addition & 0 deletions include/logical_plan/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ class QueryContext {
return plan.add_nodes();
}
int create_plan_tree();
int destroy_plan_tree();

void add_sub_ctx(std::shared_ptr<QueryContext>& ctx) {
std::unique_lock<bthread::Mutex> lck(_kill_lock);
Expand Down
23 changes: 13 additions & 10 deletions include/meta_server/table_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -984,11 +984,11 @@ class TableManager {
auto src_type = src_field.mysql_type();
auto target_type = target_field.mysql_type();
if (src_type == target_type) {
// if (src_type == pb::DATETIME || src_type == pb::FLOAT || src_type == pb::DOUBLE) {
// if (src_field.float_precision_len() > target_field.float_precision_len()) {
// return false;
// }
// }
if (src_type == pb::DATETIME) {
if (src_field.float_precision_len() > target_field.float_precision_len()) {
return false;
}
}
return true;
}
switch (src_type) {
Expand All @@ -1007,11 +1007,14 @@ class TableManager {
}
int s = primitive_to_proto_type(src_type);
int t = primitive_to_proto_type(target_type);
if (s == t) return true;
if (s == FieldDescriptorProto::TYPE_SINT32 && t == FieldDescriptorProto::TYPE_SINT64) return true;
if (s == FieldDescriptorProto::TYPE_SINT64 && t == FieldDescriptorProto::TYPE_SINT32) return true;
if (s == FieldDescriptorProto::TYPE_UINT32 && t == FieldDescriptorProto::TYPE_UINT64) return true;
if (s == FieldDescriptorProto::TYPE_UINT64 && t == FieldDescriptorProto::TYPE_UINT32) return true;
if (s != t) return false;
if (primitive_type_bytes_len(src_type) <= primitive_type_bytes_len(target_type)) {
return true;
}
// if (s == FieldDescriptorProto::TYPE_SINT32 && t == FieldDescriptorProto::TYPE_SINT64) return true;
// if (s == FieldDescriptorProto::TYPE_SINT64 && t == FieldDescriptorProto::TYPE_SINT32) return true;
// if (s == FieldDescriptorProto::TYPE_UINT32 && t == FieldDescriptorProto::TYPE_UINT64) return true;
// if (s == FieldDescriptorProto::TYPE_UINT64 && t == FieldDescriptorProto::TYPE_UINT32) return true;
return false;
}
int get_index_state(int64_t table_id, int64_t index_id, pb::IndexState& index_state) {
Expand Down
1 change: 1 addition & 0 deletions include/physical_plan/join_reorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace baikaldb {
class JoinReorder {
public:
int analyze(QueryContext* ctx);
int reorder(QueryContext* ctx, ExecNode* node);
};
}

Expand Down
2 changes: 1 addition & 1 deletion include/sqlparser/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ struct String {
size_t fast = 0;
bool has_slash = false;
static std::unordered_map<char, char> trans_map = {
{'0', '\x00'},
{'\\', '\\'},
{'\"', '\"'},
{'\'', '\''},
Expand All @@ -96,7 +97,6 @@ struct String {
{'n', '\n'},
{'b', '\b'},
{'Z', '\x1A'},
{'0', '\0'},
};
while (fast < length) {
if (has_slash) {
Expand Down
1 change: 1 addition & 0 deletions proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ message UpdateNode {
repeated SlotDescriptor update_slots = 3;
repeated Expr update_exprs = 4;
//repeated int64 affect_index_ids = 5;
optional int64 row_ttl_duration = 6; //row ttl support, compatible whit prepared stmt
};

message PacketNode {
Expand Down
18 changes: 18 additions & 0 deletions src/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ void stripslashes(std::string& str, bool is_gbk) {
size_t fast = 0;
bool has_slash = false;
static std::unordered_map<char, char> trans_map = {
{'0', '\x00'},
{'\\', '\\'},
{'\"', '\"'},
{'\'', '\''},
Expand Down Expand Up @@ -439,6 +440,23 @@ void update_schema_conf_common(const std::string& table_name, const pb::SchemaCo
DB_WARNING("%s schema conf UPDATE TO : %s", table_name.c_str(), schema_conf.ShortDebugString().c_str());
}

int primitive_type_bytes_len(pb::PrimitiveType type) {
static std::unordered_map<int32_t, int32_t> _mysql_pb_type_bytes_count = {
{ pb::INT8, 1},
{ pb::INT16, 2},
{ pb::INT32, 3},
{ pb::INT64, 4},
{ pb::UINT8, 1},
{ pb::UINT16, 2},
{ pb::UINT32, 3},
{ pb::UINT64, 4}
};
if (_mysql_pb_type_bytes_count.count(type) == 0) {
return 0xFFFF;
}
return _mysql_pb_type_bytes_count[type];
}

int primitive_to_proto_type(pb::PrimitiveType type) {
using google::protobuf::FieldDescriptorProto;
static std::unordered_map<int32_t, int32_t> _mysql_pb_type_mapping = {
Expand Down
2 changes: 1 addition & 1 deletion src/common/information_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,7 @@ void InformationSchema::init_tables() {
record->set_value(record->get_field_by_name("CREATE_TIME"), ct.cast_to(pb::DATETIME));
record->set_string(record->get_field_by_name("TABLE_COLLATION"), coll);
record->set_string(record->get_field_by_name("CREATE_OPTIONS"), "");
record->set_string(record->get_field_by_name("TABLE_COMMENT"), "");
record->set_string(record->get_field_by_name("TABLE_COMMENT"), table_info->comment);
record->set_int64(record->get_field_by_name("TABLE_ID"), table_info->id);
records.emplace_back(record);
}
Expand Down
3 changes: 2 additions & 1 deletion src/exec/access_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ bool AccessPath::check_sort_use_index(Property& sort_property) {
std::vector<ExprNode*>& order_exprs = sort_property.slot_order_exprs;
SlotRef* slot_ref = static_cast<SlotRef*>(order_exprs[0]);
size_t idx = 0;
auto& fields = index_info_ptr->fields;
std::vector<FieldInfo>fields(index_info_ptr->fields.begin(), index_info_ptr->fields.end());
fields.insert(fields.end(),index_info_ptr->pk_fields.begin(), index_info_ptr->pk_fields.end());
for (; idx < fields.size(); ++idx) {
if (tuple_id == slot_ref->tuple_id() && fields[idx].id == slot_ref->field_id()) {
break;
Expand Down
19 changes: 14 additions & 5 deletions src/exec/dml_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ int DMLNode::insert_row(RuntimeState* state, SmartRecord record, bool is_update)
return ++affected_rows;
}

int DMLNode::get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row) {
int DMLNode::get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row, int64_t& ttl_ts) {
int ret = 0;
MutTableKey pk_key;
ret = record->encode_key(*_pri_info, pk_key, -1, false);
Expand All @@ -522,12 +522,12 @@ int DMLNode::get_lock_row(RuntimeState* state, SmartRecord record, std::string*
record->decode_key(*_pri_info, *pk_str);
}
//delete requires all fields (index and non-index fields)
ret = _txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, GET_LOCK, true);
ret = _txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, GET_LOCK, true, ttl_ts);
if (ret < 0) {
return ret;
}
if (row != nullptr && _tuple_desc != nullptr
&& (_node_type == pb::DELETE_NODE || _node_type == pb::UPDATE_NODE)) {
&& (_node_type == pb::DELETE_NODE || _node_type == pb::UPDATE_NODE || _node_type == pb::LOCK_PRIMARY_NODE)) {
for (auto slot : _tuple_desc->slots()) {
auto field = record->get_field_by_tag(slot.field_id());
row->set_value(slot.tuple_id(), slot.slot_id(),
Expand Down Expand Up @@ -637,7 +637,8 @@ int DMLNode::remove_row(RuntimeState* state, SmartRecord record,
int DMLNode::delete_row(RuntimeState* state, SmartRecord record, MemRow* row) {
int ret = 0;
std::string pk_str;
ret = get_lock_row(state, record, &pk_str, row);
int64_t ttl_ts = 0;
ret = get_lock_row(state, record, &pk_str, row, ttl_ts);
if (ret == -3) {
//DB_WARNING_STATE(state, "key not in this region:%ld", _region_id);
return 0;
Expand Down Expand Up @@ -673,7 +674,8 @@ bool DMLNode::satisfy_condition_again(RuntimeState* state, MemRow* row) {
int DMLNode::update_row(RuntimeState* state, SmartRecord record, MemRow* row) {
int ret = 0;
std::string pk_str;
ret = get_lock_row(state, record, &pk_str, row);
int64_t ttl_ts = 0;
ret = get_lock_row(state, record, &pk_str, row, ttl_ts);
if (ret == -3) {
//DB_WARNING_STATE(state, "key not in this region:%ld", _region_id);
return 0;
Expand All @@ -689,6 +691,13 @@ int DMLNode::update_row(RuntimeState* state, SmartRecord record, MemRow* row) {
// UndoGetForUpdate(pk_str)? 同一个txn GetForUpdate与UndoGetForUpdate之间不要写pk_str
return 0;
}
// _row_ttl_duration == -1 代表保持原ttl意思
// TODO: 全局索引 keep ttl功能
if (_row_ttl_duration == -1 && _ttl_timestamp_us > 0 && ttl_ts > 0) {
_ttl_timestamp_us = ttl_ts;
_txn->set_write_ttl_timestamp_us(_ttl_timestamp_us);
DB_DEBUG("keep ttl_timestamp_us: %ld", _ttl_timestamp_us);
}
_indexes_ptr = &_affected_indexes;
// 影响了主键需要删除旧的行
ret = remove_row(state, record, pk_str, _update_affect_primary);
Expand Down
7 changes: 7 additions & 0 deletions src/exec/fetcher_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ void OnRPCDone::select_addr() {
if (peer_index < sorted_peers.size()) {
_addr = sorted_peers[peer_index];
DB_WARNING("choose peer %s, index: %ld", _addr.c_str(), peer_index);
FetcherStore::choose_other_if_dead(_info, _addr);
}
}
_request.set_select_without_leader(true);
Expand Down Expand Up @@ -1015,15 +1016,21 @@ void FetcherStore::choose_other_if_dead(pb::RegionInfo& info, std::string& addr)
}

std::vector<std::string> normal_peers;
std::vector<std::string> other_peers;
for (auto& peer: info.peers()) {
auto status = schema_factory->get_instance_status(peer);
if (status.status == pb::NORMAL) {
normal_peers.push_back(peer);
} else if (status.status != pb::DEAD) {
other_peers.push_back(peer);
}
}
if (normal_peers.size() > 0) {
uint32_t i = butil::fast_rand() % normal_peers.size();
addr = normal_peers[i];
} else if(other_peers.size() > 0) {
uint32_t i = butil::fast_rand() % other_peers.size();
addr = other_peers[i];
} else {
DB_DEBUG("all peer faulty, %ld", info.region_id());
}
Expand Down
3 changes: 2 additions & 1 deletion src/exec/lock_primary_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ int LockPrimaryNode::open(RuntimeState* state) {
case pb::LOCK_GET_DML: {
for (auto& record : delete_records) {
//DB_WARNING_STATE(state,"record:%s", record->debug_string().c_str());
ret = delete_row(state, record, nullptr);
std::unique_ptr<MemRow> row = state->mem_row_desc()->fetch_mem_row();
ret = delete_row(state, record, row.get());
if (ret < 0) {
DB_WARNING_STATE(state, "delete_row fail");
return -1;
Expand Down
12 changes: 7 additions & 5 deletions src/exec/packet_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,10 @@ int PacketNode::open(RuntimeState* state) {
}
_send_buf = state->send_buf();
int ret = 0;
if (state->explain_type == EXPLAIN_SHOW_COST) {
handle_show_cost(state);
return 0;
}
if (!_return_empty || op_type() == pb::OP_SELECT) {
ret = ExecNode::open(state);
if (ret < 0) {
Expand Down Expand Up @@ -484,10 +488,6 @@ int PacketNode::open(RuntimeState* state) {
}
return 0;
}
if (state->explain_type == EXPLAIN_SHOW_COST) {
handle_show_cost(state);
return 0;
}
state->set_num_affected_rows(ret);
if (op_type() != pb::OP_SELECT && op_type() != pb::OP_UNION) {
pack_ok(state->num_affected_rows(), _client);
Expand Down Expand Up @@ -682,7 +682,9 @@ int PacketNode::open_analyze(RuntimeState* state) {
return ret;
}
state->inc_num_returned_rows(batch->size());
batch_vector.push_back(batch);
if (batch->size() > 0) {
batch_vector.push_back(batch);
}
} while (!eos);

std::vector<ExprNode*> slot_order_exprs;
Expand Down
2 changes: 2 additions & 0 deletions src/exec/update_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ int UpdateNode::init(const pb::PlanNode& node) {
}
_table_id = node.derive_node().update_node().table_id();
_global_index_id = _table_id;
_row_ttl_duration = node.derive_node().update_node().row_ttl_duration();
DB_DEBUG("_row_ttl_duration:%ld", _row_ttl_duration);
_primary_slots.clear();
_primary_slots.reserve(node.derive_node().update_node().primary_slots_size());
for (auto& slot : node.derive_node().update_node().primary_slots()) {
Expand Down
2 changes: 1 addition & 1 deletion src/logical_plan/insert_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ int InsertPlanner::plan() {
insert->set_need_ignore(_insert_stmt->is_ignore);
insert->set_is_replace(_insert_stmt->is_replace);
insert->set_is_merge(_insert_stmt->is_merge);
if (_ctx->row_ttl_duration > 0) {
if (_ctx->row_ttl_duration > 0 || _ctx->row_ttl_duration == -1) {
insert->set_row_ttl_duration(_ctx->row_ttl_duration);
DB_DEBUG("row_ttl_duration: %ld", _ctx->row_ttl_duration);
}
Expand Down
Loading

0 comments on commit 5638a1c

Please sign in to comment.