diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 8be810f1559..42e7428b8e6 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -875,54 +875,41 @@ bool SQLClusterRouter::DropTable(const std::string& db, const std::string& table } } - // delete pre-aggr meta info if need - if (table_info->base_table_tid() > 0) { - std::string meta_db = openmldb::nameserver::INTERNAL_DB; - std::string meta_table = openmldb::nameserver::PRE_AGG_META_NAME; - std::string select_aggr_info = - absl::StrCat("select base_db,base_table,aggr_func,aggr_col,partition_cols,order_by_col,filter_col from ", - meta_db, ".", meta_table, " where aggr_table = '", table_info->name(), "';"); - auto rs = ExecuteSQL("", select_aggr_info, true, true, 0, status); - WARN_NOT_OK_AND_RET(status, "get aggr info failed", false); - if (rs->Size() != 1) { - SET_STATUS_AND_WARN(status, StatusCode::kCmdError, - "duplicate records generate with aggr table name: " + table_info->name()); - return false; - } - std::string idx_key; - if (rs->Next()) { - for (int i = 0; i < rs->GetSchema()->GetColumnCnt(); i++) { - if (!idx_key.empty()) { - idx_key += "|"; - } - auto k = rs->GetAsStringUnsafe(i); - if (k.empty()) { - idx_key += hybridse::codec::EMPTY_STRING; - } else { - idx_key += k; - } + // delete related pre-aggr tables first + std::string meta_db = openmldb::nameserver::INTERNAL_DB; + std::string meta_table = openmldb::nameserver::PRE_AGG_META_NAME; + std::string select_aggr_info = + absl::StrCat("select aggr_db, aggr_table from ", meta_db, ".", meta_table, " where base_table = '", + table_info->name(), "' and base_db='", table_info->db(), "';"); + auto rs = ExecuteSQL("", select_aggr_info, true, true, 0, status); + WARN_NOT_OK_AND_RET(status, "get aggr info failed", false); + if (rs->Size() > 0) { + // drop aggr-tables, if got error, delete manully + std::vector> aggr_tables; + while (rs->Next()) { + std::string aggr_db = rs->GetStringUnsafe(0); + std::string aggr_table = rs->GetStringUnsafe(1); + + if (aggr_db.empty() || aggr_table.empty()) { + WARN_NOT_OK_AND_RET( + status, absl::StrCat("aggr table ", aggr_db, " or ", aggr_table, " is empty, can't delete"), false); } - } else { - SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "access ResultSet failed"); - return false; - } - auto tablet_accessor = cluster_sdk_->GetTablet(meta_db, meta_table, (uint32_t)0); - if (!tablet_accessor) { - SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "get tablet accessor failed"); - return false; - } - auto tablet_client = tablet_accessor->GetClient(); - if (!tablet_client) { - SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "get tablet client failed"); - return false; + if (!DropTable(aggr_db, aggr_table, true, status)) { + WARN_NOT_OK_AND_RET(status, absl::StrCat("drop aggr table ", aggr_db, ".", aggr_table, " failed"), + false); + } + aggr_tables.emplace_back(aggr_db, aggr_table); } - auto tid = cluster_sdk_->GetTableId(meta_db, meta_table); - std::string msg; - if (!tablet_client->Delete(tid, 0, table_info->name(), "aggr_table", msg) || - !tablet_client->Delete(tid, 0, idx_key, "unique_key", msg)) { - SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "delete aggr meta failed"); - return false; + // drop pre-agg meta in meta table + for (auto& aggr_table : aggr_tables) { + LOG(INFO) << "drop aggr meta " << aggr_table.first << "." << aggr_table.second << "by table name"; + std::string delete_aggr_info = + absl::StrCat("delete from ", meta_db, ".", meta_table, " where aggr_table='", aggr_table.second, "';"); + auto rs = ExecuteSQL("", delete_aggr_info, true, true, 0, status); + WARN_NOT_OK_AND_RET(status, "delete aggr info failed", false); } + } else { + LOG(INFO) << "no related pre-aggr tables"; } // Check offline table info first @@ -1280,7 +1267,7 @@ bool SQLClusterRouter::ExecuteInsert(const std::string& db, const std::string& s std::vector fails; if (!codegen_rows.empty()) { - for (size_t i = 0 ; i < codegen_rows.size(); ++i) { + for (size_t i = 0; i < codegen_rows.size(); ++i) { auto r = codegen_rows[i]; auto row = std::make_shared(table_info, schema, r, put_if_absent); if (!PutRow(table_info->tid(), row, tablets, status)) { @@ -1695,7 +1682,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h } case hybridse::node::kCmdShowUser: { - std::vector value = { options_->user }; + std::vector value = {options_->user}; return ResultSetSQL::MakeResultSet({"User"}, {value}, status); } @@ -2740,7 +2727,8 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL( } case hybridse::node::kPlanTypeCreateUser: { auto create_node = dynamic_cast(node); - UserInfo user_info;; + UserInfo user_info; + ; auto result = GetUser(create_node->Name(), &user_info); if (!result.ok()) { *status = {StatusCode::kCmdError, result.status().message()}; @@ -2954,7 +2942,7 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL( if (is_online_mode) { // Handle in online mode config.emplace("spark.insert_memory_usage_limit", - std::to_string(insert_memory_usage_limit_.load(std::memory_order_relaxed))); + std::to_string(insert_memory_usage_limit_.load(std::memory_order_relaxed))); base_status = ImportOnlineData(sql, config, database, is_sync_job, offline_job_timeout, &job_info); } else { // Handle in offline mode @@ -4874,10 +4862,10 @@ std::shared_ptr SQLClusterRouter::GetNameServerJobResu } absl::StatusOr SQLClusterRouter::GetUser(const std::string& name, UserInfo* user_info) { - std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME); + std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME); hybridse::sdk::Status status; - auto rs = ExecuteSQLParameterized(nameserver::INTERNAL_DB, sql, - std::shared_ptr(), &status); + auto rs = + ExecuteSQLParameterized(nameserver::INTERNAL_DB, sql, std::shared_ptr(), &status); if (rs == nullptr) { return absl::InternalError(status.msg); } @@ -4897,17 +4885,17 @@ hybridse::sdk::Status SQLClusterRouter::AddUser(const std::string& name, const s auto real_password = password.empty() ? password : codec::Encrypt(password); uint64_t cur_ts = ::baidu::common::timer::get_micros() / 1000; std::string sql = absl::StrCat("insert into ", nameserver::USER_INFO_NAME, " values (", - "'%',", // host - "'", name, "','", // user - real_password, "',", // password - cur_ts, ",", // password_last_changed - "0,", // password_expired_time - cur_ts, ", ", // create_time - cur_ts, ",", // update_time - 1, // account_type - ",'',", // privileges - "null" // extra_info - ");"); + "'%',", // host + "'", name, "','", // user + real_password, "',", // password + cur_ts, ",", // password_last_changed + "0,", // password_expired_time + cur_ts, ", ", // create_time + cur_ts, ",", // update_time + 1, // account_type + ",'',", // privileges + "null" // extra_info + ");"); hybridse::sdk::Status status; ExecuteInsert(nameserver::INTERNAL_DB, sql, &status); return status; @@ -4917,25 +4905,25 @@ hybridse::sdk::Status SQLClusterRouter::UpdateUser(const UserInfo& user_info, co auto real_password = password.empty() ? password : codec::Encrypt(password); uint64_t cur_ts = ::baidu::common::timer::get_micros() / 1000; std::string sql = absl::StrCat("insert into ", nameserver::USER_INFO_NAME, " values (", - "'%',", // host - "'", user_info.name, "','", // user - real_password, "',", // password - cur_ts, ",", // password_last_changed - "0,", // password_expired_time - user_info.create_time, ", ", // create_time - cur_ts, ",", // update_time - 1, // account_type - ",'", user_info.privileges, "',", // privileges - "null" // extra_info - ");"); + "'%',", // host + "'", user_info.name, "','", // user + real_password, "',", // password + cur_ts, ",", // password_last_changed + "0,", // password_expired_time + user_info.create_time, ", ", // create_time + cur_ts, ",", // update_time + 1, // account_type + ",'", user_info.privileges, "',", // privileges + "null" // extra_info + ");"); hybridse::sdk::Status status; ExecuteInsert(nameserver::INTERNAL_DB, sql, &status); return status; } hybridse::sdk::Status SQLClusterRouter::DeleteUser(const std::string& name) { - std::string sql = absl::StrCat("delete from ", nameserver::USER_INFO_NAME, - " where host = '%' and user = '", name, "';"); + std::string sql = + absl::StrCat("delete from ", nameserver::USER_INFO_NAME, " where host = '%' and user = '", name, "';"); hybridse::sdk::Status status; ExecuteSQL(nameserver::INTERNAL_DB, sql, &status); return status; @@ -4948,12 +4936,10 @@ void SQLClusterRouter::AddUserToConfig(std::map* confi } } -::hybridse::sdk::Status SQLClusterRouter::RevertPut(const nameserver::TableInfo& table_info, - uint32_t end_pid, - const std::map>>& dimensions, - uint64_t ts, - const base::Slice& value, - const std::vector>& tablets) { +::hybridse::sdk::Status SQLClusterRouter::RevertPut( + const nameserver::TableInfo& table_info, uint32_t end_pid, + const std::map>>& dimensions, uint64_t ts, + const base::Slice& value, const std::vector>& tablets) { codec::RowView row_view(table_info.column_desc()); std::map column_map; for (int32_t i = 0; i < table_info.column_desc_size(); i++) {