Skip to content

Commit

Permalink
fix: drop aggr tables in drop table
Browse files Browse the repository at this point in the history
  • Loading branch information
vagetablechicken committed Apr 29, 2024
1 parent d9bb344 commit ecdf4ea
Showing 1 changed file with 68 additions and 82 deletions.
150 changes: 68 additions & 82 deletions src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -875,54 +875,41 @@ bool SQLClusterRouter::DropTable(const std::string& db, const std::string& table
}
}

// delete pre-aggr meta info if need
if (table_info->base_table_tid() > 0) {
std::string meta_db = openmldb::nameserver::INTERNAL_DB;
std::string meta_table = openmldb::nameserver::PRE_AGG_META_NAME;
std::string select_aggr_info =
absl::StrCat("select base_db,base_table,aggr_func,aggr_col,partition_cols,order_by_col,filter_col from ",
meta_db, ".", meta_table, " where aggr_table = '", table_info->name(), "';");
auto rs = ExecuteSQL("", select_aggr_info, true, true, 0, status);
WARN_NOT_OK_AND_RET(status, "get aggr info failed", false);
if (rs->Size() != 1) {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError,
"duplicate records generate with aggr table name: " + table_info->name());
return false;
}
std::string idx_key;
if (rs->Next()) {
for (int i = 0; i < rs->GetSchema()->GetColumnCnt(); i++) {
if (!idx_key.empty()) {
idx_key += "|";
}
auto k = rs->GetAsStringUnsafe(i);
if (k.empty()) {
idx_key += hybridse::codec::EMPTY_STRING;
} else {
idx_key += k;
}
// delete related pre-aggr tables first
std::string meta_db = openmldb::nameserver::INTERNAL_DB;
std::string meta_table = openmldb::nameserver::PRE_AGG_META_NAME;
std::string select_aggr_info =
absl::StrCat("select aggr_db, aggr_table from ", meta_db, ".", meta_table, " where base_table = '",
table_info->name(), "' and base_db='", table_info->db(), "';");
auto rs = ExecuteSQL("", select_aggr_info, true, true, 0, status);
WARN_NOT_OK_AND_RET(status, "get aggr info failed", false);
if (rs->Size() > 0) {
// drop aggr-tables, if got error, delete manully
std::vector<std::pair<std::string, std::string>> aggr_tables;
while (rs->Next()) {
std::string aggr_db = rs->GetStringUnsafe(0);
std::string aggr_table = rs->GetStringUnsafe(1);

if (aggr_db.empty() || aggr_table.empty()) {
WARN_NOT_OK_AND_RET(
status, absl::StrCat("aggr table ", aggr_db, " or ", aggr_table, " is empty, can't delete"), false);
}
} else {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "access ResultSet failed");
return false;
}
auto tablet_accessor = cluster_sdk_->GetTablet(meta_db, meta_table, (uint32_t)0);
if (!tablet_accessor) {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "get tablet accessor failed");
return false;
}
auto tablet_client = tablet_accessor->GetClient();
if (!tablet_client) {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "get tablet client failed");
return false;
if (!DropTable(aggr_db, aggr_table, true, status)) {
WARN_NOT_OK_AND_RET(status, absl::StrCat("drop aggr table ", aggr_db, ".", aggr_table, " failed"),
false);
}
aggr_tables.emplace_back(aggr_db, aggr_table);
}
auto tid = cluster_sdk_->GetTableId(meta_db, meta_table);
std::string msg;
if (!tablet_client->Delete(tid, 0, table_info->name(), "aggr_table", msg) ||
!tablet_client->Delete(tid, 0, idx_key, "unique_key", msg)) {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "delete aggr meta failed");
return false;
// drop pre-agg meta in meta table
for (auto& aggr_table : aggr_tables) {
LOG(INFO) << "drop aggr meta " << aggr_table.first << "." << aggr_table.second << "by table name";
std::string delete_aggr_info =
absl::StrCat("delete from ", meta_db, ".", meta_table, " where aggr_table='", aggr_table.second, "';");
auto rs = ExecuteSQL("", delete_aggr_info, true, true, 0, status);
WARN_NOT_OK_AND_RET(status, "delete aggr info failed", false);
}
} else {
LOG(INFO) << "no related pre-aggr tables";
}

// Check offline table info first
Expand Down Expand Up @@ -1280,7 +1267,7 @@ bool SQLClusterRouter::ExecuteInsert(const std::string& db, const std::string& s

std::vector<size_t> fails;
if (!codegen_rows.empty()) {
for (size_t i = 0 ; i < codegen_rows.size(); ++i) {
for (size_t i = 0; i < codegen_rows.size(); ++i) {
auto r = codegen_rows[i];
auto row = std::make_shared<SQLInsertRow>(table_info, schema, r, put_if_absent);
if (!PutRow(table_info->tid(), row, tablets, status)) {
Expand Down Expand Up @@ -1695,7 +1682,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::HandleSQLCmd(const h
}

case hybridse::node::kCmdShowUser: {
std::vector<std::string> value = { options_->user };
std::vector<std::string> value = {options_->user};
return ResultSetSQL::MakeResultSet({"User"}, {value}, status);
}

Expand Down Expand Up @@ -2740,7 +2727,8 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQL(
}
case hybridse::node::kPlanTypeCreateUser: {
auto create_node = dynamic_cast<hybridse::node::CreateUserPlanNode*>(node);
UserInfo user_info;;
UserInfo user_info;
;
auto result = GetUser(create_node->Name(), &user_info);
if (!result.ok()) {
*status = {StatusCode::kCmdError, result.status().message()};
Expand Down Expand Up @@ -2954,7 +2942,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQL(
if (is_online_mode) {
// Handle in online mode
config.emplace("spark.insert_memory_usage_limit",
std::to_string(insert_memory_usage_limit_.load(std::memory_order_relaxed)));
std::to_string(insert_memory_usage_limit_.load(std::memory_order_relaxed)));
base_status = ImportOnlineData(sql, config, database, is_sync_job, offline_job_timeout, &job_info);
} else {
// Handle in offline mode
Expand Down Expand Up @@ -4874,10 +4862,10 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::GetNameServerJobResu
}

absl::StatusOr<bool> SQLClusterRouter::GetUser(const std::string& name, UserInfo* user_info) {
std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME);
std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME);
hybridse::sdk::Status status;
auto rs = ExecuteSQLParameterized(nameserver::INTERNAL_DB, sql,
std::shared_ptr<openmldb::sdk::SQLRequestRow>(), &status);
auto rs =
ExecuteSQLParameterized(nameserver::INTERNAL_DB, sql, std::shared_ptr<openmldb::sdk::SQLRequestRow>(), &status);
if (rs == nullptr) {
return absl::InternalError(status.msg);
}
Expand All @@ -4897,17 +4885,17 @@ hybridse::sdk::Status SQLClusterRouter::AddUser(const std::string& name, const s
auto real_password = password.empty() ? password : codec::Encrypt(password);
uint64_t cur_ts = ::baidu::common::timer::get_micros() / 1000;
std::string sql = absl::StrCat("insert into ", nameserver::USER_INFO_NAME, " values (",
"'%',", // host
"'", name, "','", // user
real_password, "',", // password
cur_ts, ",", // password_last_changed
"0,", // password_expired_time
cur_ts, ", ", // create_time
cur_ts, ",", // update_time
1, // account_type
",'',", // privileges
"null" // extra_info
");");
"'%',", // host
"'", name, "','", // user
real_password, "',", // password
cur_ts, ",", // password_last_changed
"0,", // password_expired_time
cur_ts, ", ", // create_time
cur_ts, ",", // update_time
1, // account_type
",'',", // privileges
"null" // extra_info
");");
hybridse::sdk::Status status;
ExecuteInsert(nameserver::INTERNAL_DB, sql, &status);
return status;
Expand All @@ -4917,25 +4905,25 @@ hybridse::sdk::Status SQLClusterRouter::UpdateUser(const UserInfo& user_info, co
auto real_password = password.empty() ? password : codec::Encrypt(password);
uint64_t cur_ts = ::baidu::common::timer::get_micros() / 1000;
std::string sql = absl::StrCat("insert into ", nameserver::USER_INFO_NAME, " values (",
"'%',", // host
"'", user_info.name, "','", // user
real_password, "',", // password
cur_ts, ",", // password_last_changed
"0,", // password_expired_time
user_info.create_time, ", ", // create_time
cur_ts, ",", // update_time
1, // account_type
",'", user_info.privileges, "',", // privileges
"null" // extra_info
");");
"'%',", // host
"'", user_info.name, "','", // user
real_password, "',", // password
cur_ts, ",", // password_last_changed
"0,", // password_expired_time
user_info.create_time, ", ", // create_time
cur_ts, ",", // update_time
1, // account_type
",'", user_info.privileges, "',", // privileges
"null" // extra_info
");");
hybridse::sdk::Status status;
ExecuteInsert(nameserver::INTERNAL_DB, sql, &status);
return status;
}

hybridse::sdk::Status SQLClusterRouter::DeleteUser(const std::string& name) {
std::string sql = absl::StrCat("delete from ", nameserver::USER_INFO_NAME,
" where host = '%' and user = '", name, "';");
std::string sql =
absl::StrCat("delete from ", nameserver::USER_INFO_NAME, " where host = '%' and user = '", name, "';");
hybridse::sdk::Status status;
ExecuteSQL(nameserver::INTERNAL_DB, sql, &status);
return status;
Expand All @@ -4948,12 +4936,10 @@ void SQLClusterRouter::AddUserToConfig(std::map<std::string, std::string>* confi
}
}

::hybridse::sdk::Status SQLClusterRouter::RevertPut(const nameserver::TableInfo& table_info,
uint32_t end_pid,
const std::map<uint32_t, std::vector<std::pair<std::string, uint32_t>>>& dimensions,
uint64_t ts,
const base::Slice& value,
const std::vector<std::shared_ptr<::openmldb::catalog::TabletAccessor>>& tablets) {
::hybridse::sdk::Status SQLClusterRouter::RevertPut(
const nameserver::TableInfo& table_info, uint32_t end_pid,
const std::map<uint32_t, std::vector<std::pair<std::string, uint32_t>>>& dimensions, uint64_t ts,
const base::Slice& value, const std::vector<std::shared_ptr<::openmldb::catalog::TabletAccessor>>& tablets) {
codec::RowView row_view(table_info.column_desc());
std::map<std::string, uint32_t> column_map;
for (int32_t i = 0; i < table_info.column_desc_size(); i++) {
Expand Down

0 comments on commit ecdf4ea

Please sign in to comment.