From 55c1505c151f3e0d260918d5b36adcfb83371589 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 28 Nov 2023 16:01:46 +0800 Subject: [PATCH 01/10] feat: add put memory limit --- src/apiserver/api_server_impl.h | 8 +-- src/base/status.h | 1 + src/base/sys_info.h | 83 ++++++++++++++++++++++++ src/base/sys_info_test.cc | 48 ++++++++++++++ src/client/tablet_client.h | 3 +- src/datacollector/data_collector.cc | 3 +- src/datacollector/data_collector_test.cc | 4 +- src/flags.cc | 1 + src/nameserver/name_server_impl.cc | 18 ++--- src/proto/name_server.proto | 2 +- src/proto/tablet.proto | 5 +- src/storage/table_test.cc | 1 - src/tablet/tablet_impl.cc | 50 ++++++++++---- src/tablet/tablet_impl.h | 5 +- 14 files changed, 197 insertions(+), 35 deletions(-) create mode 100644 src/base/sys_info.h create mode 100644 src/base/sys_info_test.cc diff --git a/src/apiserver/api_server_impl.h b/src/apiserver/api_server_impl.h index ee41e34935b..fc8e8022417 100644 --- a/src/apiserver/api_server_impl.h +++ b/src/apiserver/api_server_impl.h @@ -27,15 +27,13 @@ #include "absl/status/status.h" #include "apiserver/interface_provider.h" #include "apiserver/json_helper.h" -#include "rapidjson/document.h" // raw rapidjson 1.1.0, not in butil +#include "bvar/bvar.h" +#include "bvar/multi_dimension.h" // latency recorder #include "proto/api_server.pb.h" +#include "rapidjson/document.h" // raw rapidjson 1.1.0, not in butil #include "sdk/sql_cluster_router.h" #include "sdk/sql_request_row.h" -#include "absl/status/status.h" -#include "bvar/bvar.h" -#include "bvar/multi_dimension.h" // latency recorder - namespace openmldb { namespace apiserver { diff --git a/src/base/status.h b/src/base/status.h index 5995138edd6..8ac134b18bd 100644 --- a/src/base/status.h +++ b/src/base/status.h @@ -96,6 +96,7 @@ enum ReturnCode { kInvalidArgs = 161, kCheckIndexFailed = 162, kCatalogUpdateFailed = 163, + kExceedPutMemoryLimit = 164, kNameserverIsNotLeader = 300, kAutoFailoverIsEnabled = 301, kEndpointIsNotExist = 302, diff --git a/src/base/sys_info.h b/src/base/sys_info.h new file mode 100644 index 00000000000..6c8e72167b0 --- /dev/null +++ b/src/base/sys_info.h @@ -0,0 +1,83 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SRC_BASE_SYS_INFO_H_ +#define SRC_BASE_SYS_INFO_H_ + +#include "absl/strings/ascii.h" +#include "absl/strings/match.h" +#include "absl/strings/numbers.h" +#include "absl/strings/string_view.h" +#include "base/status.h" + +namespace openmldb::base { + +constexpr const char* MEM_TOTAL = "MemTotal"; +constexpr const char* MEM_AVAILABLE = "MemAvailable"; + +struct SysInfo { + uint64_t mem_total = 0; // unit is kB + uint64_t mem_used = 0; // unit is kB + uint64_t mem_available = 0; // unit is kB +}; + +base::Status GetSysMem(SysInfo* info) { +#if defined(__linux__) + FILE *fd = fopen("/proc/meminfo", "r"); + if (fd == nullptr) { + return {ReturnCode::kError, "fail to open meminfo file"}; + } + char line[256]; + auto parse = [](absl::string_view str, absl::string_view key, uint64_t* val) -> base::Status { + str.remove_prefix(key.size() + 1); + str.remove_suffix(2); + str = absl::StripAsciiWhitespace(str); + if (!absl::SimpleAtoi(str, val)) { + return {ReturnCode::kError, absl::StrCat("fail to parse ", key)}; + } + return {}; + }; + int parse_cnt = 0; + while (fgets(line, sizeof(line), fd)) { + absl::string_view str_view(line); + str_view = absl::StripAsciiWhitespace(str_view); + if (absl::StartsWith(str_view, MEM_TOTAL)) { + if (auto status = parse(str_view, MEM_TOTAL, &info->mem_total); !status.OK()) { + return status; + } + parse_cnt++; + } else if (absl::StartsWith(str_view, MEM_AVAILABLE)) { + if (auto status = parse(str_view, MEM_AVAILABLE, &info->mem_available); !status.OK()) { + return status; + } + parse_cnt++; + } + if (parse_cnt >= 2) { + break; + } + } + if (parse_cnt != 2) { + return {ReturnCode::kError, "fail to parse meminfo"}; + } + info->mem_used = info->mem_total - info->mem_available; + fclose(fd); +#endif + return {}; +} + +} // namespace openmldb::base + +#endif // SRC_BASE_SYS_INFO_H_ diff --git a/src/base/sys_info_test.cc b/src/base/sys_info_test.cc new file mode 100644 index 00000000000..2ee372cde1f --- /dev/null +++ b/src/base/sys_info_test.cc @@ -0,0 +1,48 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "gtest/gtest.h" +#include "base/sys_info.h" + +namespace openmldb { +namespace base { + +class SystemInfoTest : public ::testing::Test { + public: + SystemInfoTest() {} + ~SystemInfoTest() {} +}; + +TEST_F(SystemInfoTest, GetMemory) { + base::SysInfo info; + auto status = base::GetSysMem(&info); + ASSERT_TRUE(status.OK()); + ASSERT_GT(info.mem_total, 0); + ASSERT_GT(info.mem_used, 0); + ASSERT_GT(info.mem_available, 0); + ASSERT_EQ(info.mem_total, info.mem_used + info.mem_available); + printf("total:%lu\n", info.mem_total); + printf("used:%lu\n", info.mem_used); + printf("avl:%lu\n", info.mem_available); +} + +} // namespace base +} // namespace openmldb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/client/tablet_client.h b/src/client/tablet_client.h index 9fee8e08392..16ea736d520 100644 --- a/src/client/tablet_client.h +++ b/src/client/tablet_client.h @@ -82,8 +82,7 @@ class TabletClient : public Client { bool Get(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, std::string& value, // NOLINT uint64_t& ts, // NOLINT - std::string& msg); - ; // NOLINT + std::string& msg); // NOLINT bool Get(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& idx_name, std::string& value, // NOLINT diff --git a/src/datacollector/data_collector.cc b/src/datacollector/data_collector.cc index 1af941226cf..30d10b714f9 100644 --- a/src/datacollector/data_collector.cc +++ b/src/datacollector/data_collector.cc @@ -259,7 +259,8 @@ void DataCollectorImpl::CreateTaskEnv(const datasync::AddSyncTaskRequest* reques auto tablet_client = tablet_client_map_[tablet_endpoint]; api::TableStatus table_status; if (auto st = tablet_client->GetTableStatus(tid, pid, table_status); !st.OK()) { - SET_RESP_AND_WARN(response, -1, "get table status from tablet server failed, maybe table doesn't exist: " + st.GetMsg()); + SET_RESP_AND_WARN(response, -1, "get table status from tablet server failed, maybe table doesn't exist: " + + st.GetMsg()); return; } if (!ValidateTableStatus(table_status)) { diff --git a/src/datacollector/data_collector_test.cc b/src/datacollector/data_collector_test.cc index d08cd4b71b8..3f41a48a80e 100644 --- a/src/datacollector/data_collector_test.cc +++ b/src/datacollector/data_collector_test.cc @@ -15,11 +15,11 @@ */ #include "datacollector/data_collector.h" +#include +#include "codec/sdk_codec.h" #include "gflags/gflags.h" #include "gtest/gtest.h" - -#include "codec/sdk_codec.h" #include "vm/engine.h" using openmldb::client::TabletClient; diff --git a/src/flags.cc b/src/flags.cc index 42e085781eb..3366d04accd 100644 --- a/src/flags.cc +++ b/src/flags.cc @@ -34,6 +34,7 @@ DEFINE_string(zk_auth_schema, "digest", "config the id of authentication schema" DEFINE_string(zk_cert, "", "config the application credentials"); DEFINE_string(tablet, "", "config the endpoint of tablet"); DEFINE_string(nameserver, "", "config the endpoint of nameserver"); +DEFINE_int32(get_sys_mem_interval, 10000, "config the interval of get system memory. unit is milliseconds"); DEFINE_int32(zk_keep_alive_check_interval, 15000, "config the interval of keep alive check. unit is milliseconds"); DEFINE_uint32(zk_log_level, 0, "CLI: set level integer, DISABLE_LOGGING=0, " diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc index d9ce3aff439..ad81c908174 100644 --- a/src/nameserver/name_server_impl.cc +++ b/src/nameserver/name_server_impl.cc @@ -9878,28 +9878,30 @@ base::Status NameServerImpl::InitGlobalVarTable() { } // encode row && dimensions std::vector rows; - std::vector>> rows_dimensions; + std::vector<::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>> rows_dimensions; for (auto iter = default_value.begin(); iter != default_value.end(); iter++) { std::string row; std::vector vec; vec.push_back(iter->first); vec.push_back(iter->second); codec::RowCodec::EncodeRow(vec, table_info->column_desc(), 1, row); - rows.push_back(row); - std::vector> dimensions; + rows.emplace_back(std::move(row)); + ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension> dimensions; // only one index in system table - dimensions.push_back(std::make_pair(iter->first, 0)); - rows_dimensions.push_back(dimensions); + auto dim = dimensions.Add(); + dim->set_idx(0); + dim->set_key(iter->first); + rows_dimensions.emplace_back(std::move(dimensions)); } // insert value uint32_t tid = table_info->tid(); uint32_t pid_num = table_info->table_partition_size(); for (size_t i = 0; i < default_value.size(); i++) { - std::string row = rows[i]; - std::vector> dimensions = rows_dimensions[i]; + const std::string& row = rows[i]; + auto dimensions = rows_dimensions[i]; uint32_t pid = 0; if (pid_num > 0) { - pid = (uint32_t)(::openmldb::base::hash64(dimensions[0].first) % pid_num); + pid = (uint32_t)(::openmldb::base::hash64(dimensions(0).key()) % pid_num); } // system table only have one partition, so table_partition(0) can be used for (int meta_idx = 0; meta_idx < table_info->table_partition(0).partition_meta_size(); meta_idx++) { diff --git a/src/proto/name_server.proto b/src/proto/name_server.proto index 219b83a0b73..e605692d3fe 100755 --- a/src/proto/name_server.proto +++ b/src/proto/name_server.proto @@ -98,7 +98,7 @@ message TableInfo { repeated openmldb.common.ColumnDesc column_desc = 9; repeated openmldb.common.ColumnKey column_key = 10; repeated openmldb.common.ColumnDesc added_column_desc = 11; - optional uint32 format_version = 12 [default = 1]; + optional uint32 format_version = 12 [default = 1, deprecated = true]; optional string db = 13 [default = ""]; repeated string partition_key = 14; repeated common.VersionPair schema_versions = 15; diff --git a/src/proto/tablet.proto b/src/proto/tablet.proto index ee0ec5beae1..2c7a038960b 100755 --- a/src/proto/tablet.proto +++ b/src/proto/tablet.proto @@ -193,7 +193,8 @@ message PutRequest { optional uint32 pid = 5; repeated Dimension dimensions = 6; repeated TSDimension ts_dimensions = 7 [deprecated = true]; - optional uint32 format_version = 8 [default = 0]; + optional uint32 format_version = 8 [default = 0, deprecated = true]; + optional uint32 memory_limit = 9; } message PutResponse { @@ -324,7 +325,7 @@ message TableMeta { repeated openmldb.common.ColumnKey column_key = 11; repeated openmldb.common.ColumnDesc added_column_desc = 12; // format_version 0 , the legacy format 1 ,the new one - optional uint32 format_version = 13 [default = 0]; + optional uint32 format_version = 13 [default = 0, deprecated = true]; optional string db = 14 [default = ""]; repeated common.VersionPair schema_versions = 15; repeated common.TablePartition table_partition = 16; diff --git a/src/storage/table_test.cc b/src/storage/table_test.cc index 8e9b7b09ca8..251e92986c6 100644 --- a/src/storage/table_test.cc +++ b/src/storage/table_test.cc @@ -1924,7 +1924,6 @@ TEST_P(TableTest, NegativeTs) { table_meta.set_seg_cnt(8); table_meta.set_mode(::openmldb::api::TableMode::kTableLeader); table_meta.set_key_entry_max_height(8); - table_meta.set_format_version(1); table_meta.set_storage_mode(storageMode); SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "card", ::openmldb::type::kString); SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "ts1", ::openmldb::type::kBigInt); diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index 2c506be510f..829b5f7593f 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -43,6 +43,7 @@ #include "base/proto_util.h" #include "base/status.h" #include "base/strings.h" +#include "base/sys_info.h" #include "brpc/controller.h" #include "butil/iobuf.h" #include "codec/codec.h" @@ -77,6 +78,7 @@ DECLARE_uint32(scan_max_bytes_size); DECLARE_uint32(scan_reserve_size); DECLARE_uint32(max_memory_mb); DECLARE_double(mem_release_rate); +DECLARE_int32(get_sys_mem_interval); DECLARE_string(db_root_path); DECLARE_string(ssd_root_path); DECLARE_string(hdd_root_path); @@ -135,7 +137,7 @@ TabletImpl::TabletImpl() replicators_(), snapshots_(), zk_client_(nullptr), - keep_alive_pool_(1), + trivial_task_pool_(1), task_pool_(FLAGS_task_pool_size), io_pool_(FLAGS_io_pool_size), snapshot_pool_(FLAGS_snapshot_pool_size), @@ -154,7 +156,7 @@ TabletImpl::TabletImpl() TabletImpl::~TabletImpl() { task_pool_.Stop(true); - keep_alive_pool_.Stop(true); + trivial_task_pool_.Stop(true); gc_pool_.Stop(true); io_pool_.Stop(true); snapshot_pool_.Stop(true); @@ -315,6 +317,9 @@ bool TabletImpl::Init(const std::string& zk_cluster, const std::string& zk_path, #ifdef TCMALLOC_ENABLE MallocExtension* tcmalloc = MallocExtension::instance(); tcmalloc->SetMemoryReleaseRate(FLAGS_mem_release_rate); +#endif +#if defined(__linux__) + trivial_task_pool_.DelayTask(FLAGS_get_sys_mem_interval, boost::bind(&TabletImpl::UpdateMemoryUsage, this)); #endif return true; } @@ -402,7 +407,7 @@ bool TabletImpl::RegisterZK() { LOG(WARNING) << "add notify watcher failed"; return false; } - keep_alive_pool_.DelayTask(FLAGS_zk_keep_alive_check_interval, boost::bind(&TabletImpl::CheckZkClient, this)); + trivial_task_pool_.DelayTask(FLAGS_zk_keep_alive_check_interval, boost::bind(&TabletImpl::CheckZkClient, this)); } return true; } @@ -437,6 +442,17 @@ bool TabletImpl::CheckGetDone(::openmldb::api::GetType type, uint64_t ts, uint64 return false; } +void TabletImpl::UpdateMemoryUsage() { + base::SysInfo info; + if (auto status = base::GetSysMem(&info); status.OK()) { + system_memory_usage_rate_.store(info.mem_used * 100 / info.mem_total, std::memory_order_relaxed); + DEBUGLOG("system_memory_usage_rate is %u", system_memory_usage_rate_.load(std::memory_order_relaxed)); + } else { + PDLOG(WARNING, "GetSysMem run failed. error message %s", status.GetMsg().c_str()); + } + trivial_task_pool_.DelayTask(FLAGS_get_sys_mem_interval, boost::bind(&TabletImpl::UpdateMemoryUsage, this)); +} + int32_t TabletImpl::GetIndex(const ::openmldb::api::GetRequest* request, const ::openmldb::api::TableMeta& meta, const std::map>& vers_schema, CombineIterator* it, std::string* value, uint64_t* ts) { @@ -714,13 +730,22 @@ void TabletImpl::Put(RpcController* controller, const ::openmldb::api::PutReques response->set_msg("table is loading"); return; } - if (table->GetStorageMode() == ::openmldb::common::StorageMode::kMemory && - memory_used_.load(std::memory_order_relaxed) > FLAGS_max_memory_mb) { - PDLOG(WARNING, "current memory %lu MB exceed max memory limit %lu MB. tid %u, pid %u", - memory_used_.load(std::memory_order_relaxed), FLAGS_max_memory_mb, tid, pid); - response->set_code(::openmldb::base::ReturnCode::kExceedMaxMemory); - response->set_msg("exceed max memory"); - return; + if (table->GetStorageMode() == ::openmldb::common::StorageMode::kMemory) { + if (memory_used_.load(std::memory_order_relaxed) > FLAGS_max_memory_mb) { + PDLOG(WARNING, "current memory %lu MB exceed max memory limit %lu MB. tid %u, pid %u", + memory_used_.load(std::memory_order_relaxed), FLAGS_max_memory_mb, tid, pid); + response->set_code(base::ReturnCode::kExceedMaxMemory); + response->set_msg("exceed max memory"); + return; + } + if (request->has_memory_limit() && request->memory_limit() > 0 + && system_memory_usage_rate_.load(std::memory_order_relaxed) > request->memory_limit()) { + PDLOG(WARNING, "current system_memory_usage_rate %u exceed request memory limit %u. tid %u, pid %u", + system_memory_usage_rate_.load(std::memory_order_relaxed), request->memory_limit(), tid, pid); + response->set_code(base::ReturnCode::kExceedPutMemoryLimit); + response->set_msg("exceed memory limit"); + return; + } } ::openmldb::api::LogEntry entry; entry.set_pk(request->pk()); @@ -2968,7 +2993,8 @@ void TabletImpl::LoadTable(RpcController* controller, const ::openmldb::api::Loa std::string db_path = GetDBPath(root_path, tid, pid); if (!::openmldb::base::IsExists(db_path)) { - PDLOG(WARNING, "table db path does not exist, but still load. tid %u, pid %u, path %s", tid, pid, db_path.c_str()); + PDLOG(WARNING, "table db path does not exist, but still load. tid %u, pid %u, path %s", + tid, pid, db_path.c_str()); } std::shared_ptr table = GetTable(tid, pid); @@ -4265,7 +4291,7 @@ void TabletImpl::CheckZkClient() { PDLOG(INFO, "registe zk ok"); } } - keep_alive_pool_.DelayTask(FLAGS_zk_keep_alive_check_interval, boost::bind(&TabletImpl::CheckZkClient, this)); + trivial_task_pool_.DelayTask(FLAGS_zk_keep_alive_check_interval, boost::bind(&TabletImpl::CheckZkClient, this)); } } diff --git a/src/tablet/tablet_impl.h b/src/tablet/tablet_impl.h index c24a253c9e9..9d28012ef5c 100644 --- a/src/tablet/tablet_impl.h +++ b/src/tablet/tablet_impl.h @@ -435,6 +435,8 @@ class TabletImpl : public ::openmldb::api::TabletServer { // refresh the pre-aggr tables info bool RefreshAggrCatalog(); + void UpdateMemoryUsage(); + private: Tables tables_; std::mutex mu_; @@ -444,7 +446,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { Snapshots snapshots_; Aggregators aggregators_; ZkClient* zk_client_; - ThreadPool keep_alive_pool_; + ThreadPool trivial_task_pool_; ThreadPool task_pool_; ThreadPool io_pool_; ThreadPool snapshot_pool_; @@ -474,6 +476,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { std::unique_ptr deploy_collector_; std::atomic memory_used_ = 0; + std::atomic system_memory_usage_rate_ = 0; // [0, 100] }; } // namespace tablet From c36849ef84895fa8dd2455e639681654dccef7d8 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 28 Nov 2023 16:09:37 +0800 Subject: [PATCH 02/10] refact: revert nameserver --- src/datacollector/data_collector_test.cc | 2 +- src/nameserver/name_server_impl.cc | 18 ++++++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/datacollector/data_collector_test.cc b/src/datacollector/data_collector_test.cc index 3f41a48a80e..cbd11b41fdd 100644 --- a/src/datacollector/data_collector_test.cc +++ b/src/datacollector/data_collector_test.cc @@ -14,10 +14,10 @@ * limitations under the License. */ -#include "datacollector/data_collector.h" #include #include "codec/sdk_codec.h" +#include "datacollector/data_collector.h" #include "gflags/gflags.h" #include "gtest/gtest.h" #include "vm/engine.h" diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc index ad81c908174..d9ce3aff439 100644 --- a/src/nameserver/name_server_impl.cc +++ b/src/nameserver/name_server_impl.cc @@ -9878,30 +9878,28 @@ base::Status NameServerImpl::InitGlobalVarTable() { } // encode row && dimensions std::vector rows; - std::vector<::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>> rows_dimensions; + std::vector>> rows_dimensions; for (auto iter = default_value.begin(); iter != default_value.end(); iter++) { std::string row; std::vector vec; vec.push_back(iter->first); vec.push_back(iter->second); codec::RowCodec::EncodeRow(vec, table_info->column_desc(), 1, row); - rows.emplace_back(std::move(row)); - ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension> dimensions; + rows.push_back(row); + std::vector> dimensions; // only one index in system table - auto dim = dimensions.Add(); - dim->set_idx(0); - dim->set_key(iter->first); - rows_dimensions.emplace_back(std::move(dimensions)); + dimensions.push_back(std::make_pair(iter->first, 0)); + rows_dimensions.push_back(dimensions); } // insert value uint32_t tid = table_info->tid(); uint32_t pid_num = table_info->table_partition_size(); for (size_t i = 0; i < default_value.size(); i++) { - const std::string& row = rows[i]; - auto dimensions = rows_dimensions[i]; + std::string row = rows[i]; + std::vector> dimensions = rows_dimensions[i]; uint32_t pid = 0; if (pid_num > 0) { - pid = (uint32_t)(::openmldb::base::hash64(dimensions(0).key()) % pid_num); + pid = (uint32_t)(::openmldb::base::hash64(dimensions[0].first) % pid_num); } // system table only have one partition, so table_partition(0) can be used for (int meta_idx = 0; meta_idx < table_info->table_partition(0).partition_meta_size(); meta_idx++) { From 857cfd6e0891529804fdec55ab6d50378b87a6ac Mon Sep 17 00:00:00 2001 From: 4paradigm <4paradigm@denglong.local> Date: Thu, 30 Nov 2023 18:13:14 +0800 Subject: [PATCH 03/10] feat: add insert_memory_limit option --- .../src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java | 2 ++ .../openmldb/spark/write/OpenmldbDataSingleWriter.java | 1 + .../_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java | 2 ++ src/sdk/sql_router.h | 1 + 4 files changed, 6 insertions(+) diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java index 83dd73cf657..00045237dd8 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java @@ -45,6 +45,7 @@ public class SdkOption { private int glogLevel = 0; private String glogDir = ""; private int maxSqlCacheSize = 50; + private int insertMemoryUsageLimit = 0; // [0-100], the default value 0 means unlimited private void buildBaseOptions(BasicRouterOptions opt) { opt.setEnable_debug(getEnableDebug()); @@ -52,6 +53,7 @@ private void buildBaseOptions(BasicRouterOptions opt) { opt.setGlog_level(getGlogLevel()); opt.setGlog_dir(getGlogDir()); opt.setMax_sql_cache_size(getMaxSqlCacheSize()); + opt.setInsert_memory_usage_limit(getInsertMemoryUsageLimit()); } public SQLRouterOptions buildSQLRouterOptions() throws SqlException { diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java index fd767d4eebb..56d61f33e66 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java @@ -44,6 +44,7 @@ public OpenmldbDataSingleWriter(OpenmldbWriteConfig config, int partitionId, lon SdkOption option = new SdkOption(); option.setZkCluster(config.zkCluster); option.setZkPath(config.zkPath); + option.setInsertMemoryUsageLimit(config.memoryLimit); SqlClusterExecutor executor = new SqlClusterExecutor(option); String dbName = config.dbName; String tableName = config.tableName; diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java index 89c2d801ca5..ca1161ab110 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java @@ -24,6 +24,7 @@ // Must serializable public class OpenmldbWriteConfig implements Serializable { public final String dbName, tableName, zkCluster, zkPath, writerType; + public final int insertMemoryUsageLimit; public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType) { this.dbName = dbName; @@ -31,6 +32,7 @@ public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, St this.zkCluster = option.getZkCluster(); this.zkPath = option.getZkPath(); this.writerType = writerType; + this.insertMemoryUsageLimit = option.getInsertMemoryUsageLimit(); // TODO(hw): other configs in SdkOption } } diff --git a/src/sdk/sql_router.h b/src/sdk/sql_router.h index 68186a83b00..85cfa03f492 100644 --- a/src/sdk/sql_router.h +++ b/src/sdk/sql_router.h @@ -49,6 +49,7 @@ struct BasicRouterOptions { int glog_level = 0; // empty means to stderr std::string glog_dir = ""; + int insert_memory_usage_limit = 0; }; struct SQLRouterOptions : BasicRouterOptions { From b566b7ce22c2fc21c90038ef1f01d567c59cdcfc Mon Sep 17 00:00:00 2001 From: dl239 Date: Fri, 1 Dec 2023 15:17:19 +0800 Subject: [PATCH 04/10] feat: limit memory in sdk --- .../com/_4paradigm/openmldb/sdk/SdkOption.java | 2 -- .../spark/write/OpenmldbDataSingleWriter.java | 1 - .../spark/write/OpenmldbWriteConfig.java | 2 -- src/client/tablet_client.cc | 13 ++++++++++--- src/client/tablet_client.h | 6 ++++-- src/sdk/sql_cluster_router.cc | 18 ++++++++++++++++-- src/sdk/sql_cluster_router.h | 1 + src/sdk/sql_router.h | 1 - 8 files changed, 31 insertions(+), 13 deletions(-) diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java index 00045237dd8..83dd73cf657 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java @@ -45,7 +45,6 @@ public class SdkOption { private int glogLevel = 0; private String glogDir = ""; private int maxSqlCacheSize = 50; - private int insertMemoryUsageLimit = 0; // [0-100], the default value 0 means unlimited private void buildBaseOptions(BasicRouterOptions opt) { opt.setEnable_debug(getEnableDebug()); @@ -53,7 +52,6 @@ private void buildBaseOptions(BasicRouterOptions opt) { opt.setGlog_level(getGlogLevel()); opt.setGlog_dir(getGlogDir()); opt.setMax_sql_cache_size(getMaxSqlCacheSize()); - opt.setInsert_memory_usage_limit(getInsertMemoryUsageLimit()); } public SQLRouterOptions buildSQLRouterOptions() throws SqlException { diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java index 56d61f33e66..fd767d4eebb 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java @@ -44,7 +44,6 @@ public OpenmldbDataSingleWriter(OpenmldbWriteConfig config, int partitionId, lon SdkOption option = new SdkOption(); option.setZkCluster(config.zkCluster); option.setZkPath(config.zkPath); - option.setInsertMemoryUsageLimit(config.memoryLimit); SqlClusterExecutor executor = new SqlClusterExecutor(option); String dbName = config.dbName; String tableName = config.tableName; diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java index ca1161ab110..89c2d801ca5 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java @@ -24,7 +24,6 @@ // Must serializable public class OpenmldbWriteConfig implements Serializable { public final String dbName, tableName, zkCluster, zkPath, writerType; - public final int insertMemoryUsageLimit; public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType) { this.dbName = dbName; @@ -32,7 +31,6 @@ public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, St this.zkCluster = option.getZkCluster(); this.zkPath = option.getZkPath(); this.writerType = writerType; - this.insertMemoryUsageLimit = option.getInsertMemoryUsageLimit(); // TODO(hw): other configs in SdkOption } } diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index 878d2a5f3cc..4fade044455 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -202,19 +202,26 @@ bool TabletClient::UpdateTableMetaForAddField(uint32_t tid, const std::vector>& dimensions) { + const std::vector>& dimensions, + int memory_usage_limit) { ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension> pb_dimensions; for (size_t i = 0; i < dimensions.size(); i++) { ::openmldb::api::Dimension* d = pb_dimensions.Add(); d->set_key(dimensions[i].first); d->set_idx(dimensions[i].second); } - return Put(tid, pid, time, base::Slice(value), &pb_dimensions); + return Put(tid, pid, time, base::Slice(value), &pb_dimensions, memory_usage_limit); } bool TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value, - ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions) { + ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions, + int memory_usage_limit) { ::openmldb::api::PutRequest request; + if (memory_usage_limit < 0 || memory_usage_limit > 100) { + return false; + } else if (memory_usage_limit > 0) { + request.set_memory_limit(memory_usage_limit); + } request.set_time(time); request.set_value(value.data(), value.size()); request.set_tid(tid); diff --git a/src/client/tablet_client.h b/src/client/tablet_client.h index 16ea736d520..c09c3c4532e 100644 --- a/src/client/tablet_client.h +++ b/src/client/tablet_client.h @@ -75,10 +75,12 @@ class TabletClient : public Client { bool Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& value); bool Put(uint32_t tid, uint32_t pid, uint64_t time, const std::string& value, - const std::vector>& dimensions); + const std::vector>& dimensions, + int memory_usage_limit = 0); bool Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value, - ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions); + ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions, + int memory_usage_limit = 0); bool Get(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, std::string& value, // NOLINT uint64_t& ts, // NOLINT diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 1a55e94fb2e..4b17d3680fb 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -319,6 +319,7 @@ bool SQLClusterRouter::Init() { session_variables_.emplace("enable_trace", "false"); session_variables_.emplace("sync_job", "false"); session_variables_.emplace("job_timeout", "60000"); // rpc request timeout for taskmanager + session_variables_.emplace("insert_memory_usage_limit", "0"); } return true; } @@ -1362,7 +1363,8 @@ bool SQLClusterRouter::PutRow(uint32_t tid, const std::shared_ptr& if (client) { DLOG(INFO) << "put data to endpoint " << client->GetEndpoint() << " with dimensions size " << kv.second.size(); - bool ret = client->Put(tid, pid, cur_ts, row->GetRow(), kv.second); + bool ret = client->Put(tid, pid, cur_ts, row->GetRow(), kv.second, + insert_memory_usage_limit_.load(std::memory_order_relaxed)); if (!ret) { SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "INSERT failed, tid " + std::to_string(tid) + @@ -1478,7 +1480,8 @@ bool SQLClusterRouter::ExecuteInsert(const std::string& db, const std::string& n if (client) { DLOG(INFO) << "put data to endpoint " << client->GetEndpoint() << " with dimensions size " << kv.second.size(); - bool ret = client->Put(tid, pid, cur_ts, row_value, &kv.second); + bool ret = client->Put(tid, pid, cur_ts, row_value, &kv.second, + insert_memory_usage_limit_.load(std::memory_order_relaxed)); if (!ret) { SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "INSERT failed, tid " + std::to_string(tid) + @@ -2840,6 +2843,8 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL( ::openmldb::base::Status base_status; if (is_online_mode) { // Handle in online mode + config.emplace("insert_memory_usage_limit", + std::to_string(insert_memory_usage_limit_.load(std::memory_order_relaxed))); base_status = ImportOnlineData(sql, config, database, is_sync_job, offline_job_timeout, &job_info); } else { // Handle in offline mode @@ -3083,6 +3088,15 @@ ::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNod if (!absl::SimpleAtoi(value, &new_timeout)) { return {StatusCode::kCmdError, "Fail to parse value, can't set the request timeout"}; } + } else if (key == "insert_memory_usage_limit") { + int limit = 0; + if (!absl::SimpleAtoi(value, &limit)) { + return {StatusCode::kCmdError, "Fail to parse value, can't set the insert_memory_usage_limit"}; + } + if (limit < 0 || limit > 100) { + return {StatusCode::kCmdError, "Invalid value! The value must be between 0 and 100"}; + } + insert_memory_usage_limit_.store(limit, std::memory_order_relaxed); } else { return {}; } diff --git a/src/sdk/sql_cluster_router.h b/src/sdk/sql_cluster_router.h index f5661c9a1bb..1ed927e6bc9 100644 --- a/src/sdk/sql_cluster_router.h +++ b/src/sdk/sql_cluster_router.h @@ -428,6 +428,7 @@ class SQLClusterRouter : public SQLRouter { input_lru_cache_; ::openmldb::base::SpinMutex mu_; ::openmldb::base::Random rand_; + std::atomic insert_memory_usage_limit_ = 0; // [0-100], the default value 0 means unlimited }; class Bias { diff --git a/src/sdk/sql_router.h b/src/sdk/sql_router.h index 85cfa03f492..68186a83b00 100644 --- a/src/sdk/sql_router.h +++ b/src/sdk/sql_router.h @@ -49,7 +49,6 @@ struct BasicRouterOptions { int glog_level = 0; // empty means to stderr std::string glog_dir = ""; - int insert_memory_usage_limit = 0; }; struct SQLRouterOptions : BasicRouterOptions { From 28db23230f81ff994ef9858237b981327988696a Mon Sep 17 00:00:00 2001 From: 4paradigm <4paradigm@denglong.local> Date: Fri, 1 Dec 2023 16:57:03 +0800 Subject: [PATCH 05/10] feat: add insertMemoryUsageLimit to offline job --- .../java/com/_4paradigm/openmldb/spark/OpenmldbSource.java | 6 +++++- .../java/com/_4paradigm/openmldb/spark/OpenmldbTable.java | 6 ++++-- .../openmldb/spark/write/OpenmldbDataSingleWriter.java | 1 + .../_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java | 1 + .../openmldb/spark/write/OpenmldbWriteConfig.java | 4 +++- 5 files changed, 14 insertions(+), 4 deletions(-) diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java index 5595e313af5..511aebd97b0 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java @@ -40,6 +40,7 @@ public class OpenmldbSource implements TableProvider, DataSourceRegister { // single: insert when read one row // batch: insert when commit(after read a whole partition) private String writerType = "single"; + private int insertMemoryUsageLimit = 0; @Override public StructType inferSchema(CaseInsensitiveStringMap options) { @@ -70,12 +71,15 @@ public StructType inferSchema(CaseInsensitiveStringMap options) { writerType = options.get("writerType"); } + if (options.containsKey("insert_memory_usage_limit")) { + insertMemoryUsageLimit = Integer.parseInt(options.get("insert_memory_usage_limit")); + } return null; } @Override public Table getTable(StructType schema, Transform[] partitioning, Map properties) { - return new OpenmldbTable(dbName, tableName, option, writerType); + return new OpenmldbTable(dbName, tableName, option, writerType, insertMemoryUsageLimit); } @Override diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java index 0cf98b7d19e..481a9cc1f4c 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java @@ -49,15 +49,17 @@ public class OpenmldbTable implements SupportsWrite, SupportsRead { private final String tableName; private final SdkOption option; private final String writerType; + private final int insertMemoryUsageLimit; private SqlExecutor executor = null; private Set capabilities; - public OpenmldbTable(String dbName, String tableName, SdkOption option, String writerType) { + public OpenmldbTable(String dbName, String tableName, SdkOption option, String writerType, int insertMemoryUsageLimit) { this.dbName = dbName; this.tableName = tableName; this.option = option; this.writerType = writerType; + this.insertMemoryUsageLimit = insertMemoryUsageLimit; try { this.executor = new SqlClusterExecutor(option); // no need to check table exists, schema() will check it later @@ -69,7 +71,7 @@ public OpenmldbTable(String dbName, String tableName, SdkOption option, String w @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { - OpenmldbWriteConfig config = new OpenmldbWriteConfig(dbName, tableName, option, writerType); + OpenmldbWriteConfig config = new OpenmldbWriteConfig(dbName, tableName, option, writerType, insertMemoryUsageLimit); return new OpenmldbWriteBuilder(config, info); } diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java index fd767d4eebb..9ef329620b3 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java @@ -47,6 +47,7 @@ public OpenmldbDataSingleWriter(OpenmldbWriteConfig config, int partitionId, lon SqlClusterExecutor executor = new SqlClusterExecutor(option); String dbName = config.dbName; String tableName = config.tableName; + executor.executeSQL(dbName, "SET @@insert_memory_usage_limit=" + config.insertMemoryUsageLimit); Schema schema = executor.getTableSchema(dbName, tableName); // create insert placeholder diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java index 3cb7632ae0b..5e293d10751 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java @@ -47,6 +47,7 @@ public OpenmldbDataWriter(OpenmldbWriteConfig config, int partitionId, long task SqlClusterExecutor executor = new SqlClusterExecutor(option); String dbName = config.dbName; String tableName = config.tableName; + executor.executeSQL(dbName, "SET @@insert_memory_usage_limit=" + config.insertMemoryUsageLimit); Schema schema = executor.getTableSchema(dbName, tableName); // create insert placeholder diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java index 89c2d801ca5..c7fb68a9cf6 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java @@ -24,13 +24,15 @@ // Must serializable public class OpenmldbWriteConfig implements Serializable { public final String dbName, tableName, zkCluster, zkPath, writerType; + public int insertMemoryUsageLimit; - public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType) { + public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType, int insertMemoryUsageLimit) { this.dbName = dbName; this.tableName = tableName; this.zkCluster = option.getZkCluster(); this.zkPath = option.getZkPath(); this.writerType = writerType; + this.insertMemoryUsageLimit = insertMemoryUsageLimit; // TODO(hw): other configs in SdkOption } } From 81fd827c6593d43c870f1dc8eec201bc86101f55 Mon Sep 17 00:00:00 2001 From: 4paradigm <4paradigm@denglong.local> Date: Fri, 1 Dec 2023 17:30:28 +0800 Subject: [PATCH 06/10] refact: add final --- .../_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java index c7fb68a9cf6..80007b14ae5 100644 --- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java +++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java @@ -24,7 +24,7 @@ // Must serializable public class OpenmldbWriteConfig implements Serializable { public final String dbName, tableName, zkCluster, zkPath, writerType; - public int insertMemoryUsageLimit; + public final int insertMemoryUsageLimit; public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType, int insertMemoryUsageLimit) { this.dbName = dbName; From 867e8d51bc7e03f2190f56e688475999956fa69f Mon Sep 17 00:00:00 2001 From: dl239 Date: Fri, 1 Dec 2023 18:06:06 +0800 Subject: [PATCH 07/10] docs: add doc --- docs/en/reference/sql/ddl/SET_STATEMENT.md | 1 + docs/en/reference/sql/dml/LOAD_DATA_STATEMENT.md | 1 + docs/zh/openmldb_sql/ddl/SET_STATEMENT.md | 1 + docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md | 2 ++ 4 files changed, 5 insertions(+) diff --git a/docs/en/reference/sql/ddl/SET_STATEMENT.md b/docs/en/reference/sql/ddl/SET_STATEMENT.md index 6c0e83de75a..25d03370eaf 100644 --- a/docs/en/reference/sql/ddl/SET_STATEMENT.md +++ b/docs/en/reference/sql/ddl/SET_STATEMENT.md @@ -35,6 +35,7 @@ The following format is also equivalent. | @@session.sync_job|@@sync_job | When the value is `true`, the offline command will be executed synchronously, waiting for the final result of the execution.
When the value is `false`, the offline command returns immediately. If you need to check the execution, please use `SHOW JOB` command. | `true`,
`false` | `false` | | @@session.sync_timeout|@@sync_timeout | When `sync_job=true`, you can configure the waiting time for synchronization commands. The timeout will return immediately. After the timeout returns, you can still view the command execution through `SHOW JOB`. | Int | 20000 | | @@session.spark_config|@@spark_config | Set the Spark configuration for offline jobs, configure like 'spark.executor.memory=2g;spark.executor.cores=2'. Notice that the priority of this Spark configuration is higer than TaskManager Spark configuration but lower than CLI Spark configuration file. | String | "" | +| @@session.insert_memory_usage_limit |@@insert_memory_usage_limit | Set server memory usage limit when inserting or importing data. If the server memory usage exceeds the set value, the insertion will fail. The value range is 0-100. 0 means unlimited | Int | "0" | ## Example diff --git a/docs/en/reference/sql/dml/LOAD_DATA_STATEMENT.md b/docs/en/reference/sql/dml/LOAD_DATA_STATEMENT.md index 8210b92217c..9e78732d212 100644 --- a/docs/en/reference/sql/dml/LOAD_DATA_STATEMENT.md +++ b/docs/en/reference/sql/dml/LOAD_DATA_STATEMENT.md @@ -62,6 +62,7 @@ The following table introduces the parameters of `LOAD DATA INFILE`. - As metioned in the above table, online execution mode only supports append input mode. - When `deep_copy=false`, OpenMLDB doesn't support to modify the data in the soft link. Therefore, if the current offline data comes from a soft link, `append` import is no longer supported. Moreover, if current connection is soft copy, using the hard copy with `overwrite` will not delete the data of the soft connection. +- If the `insert_memory_usage_limit` session variable is set, a failure will be returned if the server memory usage exceeds the set value during online import ``` diff --git a/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md b/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md index 1b513913e10..4b63861e59f 100644 --- a/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md +++ b/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md @@ -36,6 +36,7 @@ sessionVariableName ::= '@@'Identifier | '@@session.'Identifier | '@@global.'Ide | @@session.sync_job|@@sync_job | 当该变量值为 `true`,离线的命令将变为同步,等待执行的最终结果。
当该变量值为 `false`,离线的命令即时返回,若要查看命令的执行情况,请使用`SHOW JOB`。 | "true" \| "false" | "false" | | @@session.job_timeout|@@job_timeout | 可配置离线异步命令或离线管理命令的等待时间(以*毫秒*为单位),将立即返回。离线异步命令返回后仍可通过`SHOW JOB`查看命令执行情况。 | Int | "20000" | | @@session.spark_config|@@spark_config | 设置离线任务的 Spark 参数,配置项参考 'spark.executor.memory=2g;spark.executor.cores=2'。注意此 Spark 配置优先级高于 TaskManager 默认 Spark 配置,低于命令行的 Spark 配置文件。 | String | "" | +| @@session.insert_memory_usage_limit|@@insert_memory_usage_limit | 设置数据插入或者数据导入时服务端内存使用率限制。取值范围为0-100。如果服务端内存使用率超过设置的值,就会插入失败。设置为0表示不限制 | Int | "0" | ## Example ### 设置和显示会话系统变量 diff --git a/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md b/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md index b51ab8306d3..a9aa2ffccbe 100644 --- a/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md +++ b/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md @@ -113,6 +113,8 @@ LOAD DATA INFILE 'hive://db1.t1' INTO TABLE t1; 在线导入只允许`mode='append'`,无法`overwrite`或`error_if_exists`。 +如果设置了 `insert_memory_usage_limit` session变量,服务端内存使用率超过设定的值就会返回失败。 + ## 离线导入规则 表的离线信息可通过`desc
`查看。我们将数据地址分为两类,离线地址是OpenMLDB的内部存储路径,硬拷贝将写入此地址,仅一个;软链接地址是软链接导入的地址列表。 From af34cc0d35d1370e393474a50f848a321c349ca6 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 5 Dec 2023 14:22:09 +0800 Subject: [PATCH 08/10] fix: add error message --- src/base/sys_info_test.cc | 3 --- src/client/tablet_client.cc | 31 +++++++++++++--------------- src/client/tablet_client.h | 6 +++--- src/cmd/openmldb.cc | 2 +- src/nameserver/name_server_impl.cc | 2 +- src/replica/snapshot_replica_test.cc | 12 +++++------ src/sdk/mini_cluster_batch_bm.cc | 2 +- src/sdk/sql_cluster_router.cc | 12 +++++------ src/tablet/tablet_impl.cc | 8 +++++-- 9 files changed, 38 insertions(+), 40 deletions(-) diff --git a/src/base/sys_info_test.cc b/src/base/sys_info_test.cc index 2ee372cde1f..45fd05c0572 100644 --- a/src/base/sys_info_test.cc +++ b/src/base/sys_info_test.cc @@ -34,9 +34,6 @@ TEST_F(SystemInfoTest, GetMemory) { ASSERT_GT(info.mem_used, 0); ASSERT_GT(info.mem_available, 0); ASSERT_EQ(info.mem_total, info.mem_used + info.mem_available); - printf("total:%lu\n", info.mem_total); - printf("used:%lu\n", info.mem_used); - printf("avl:%lu\n", info.mem_available); } } // namespace base diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index 4fade044455..afaa5cf395a 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -201,7 +201,7 @@ bool TabletClient::UpdateTableMetaForAddField(uint32_t tid, const std::vector>& dimensions, int memory_usage_limit) { ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension> pb_dimensions; @@ -213,12 +213,12 @@ bool TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const std::str return Put(tid, pid, time, base::Slice(value), &pb_dimensions, memory_usage_limit); } -bool TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value, +base::Status TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value, ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions, int memory_usage_limit) { ::openmldb::api::PutRequest request; if (memory_usage_limit < 0 || memory_usage_limit > 100) { - return false; + return {base::ReturnCode::kError, absl::StrCat("invalid memory_usage_limit ", memory_usage_limit)}; } else if (memory_usage_limit > 0) { request.set_memory_limit(memory_usage_limit); } @@ -228,16 +228,15 @@ bool TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Sl request.set_pid(pid); request.mutable_dimensions()->Swap(dimensions); ::openmldb::api::PutResponse response; - bool ok = - client_.SendRequest(&::openmldb::api::TabletServer_Stub::Put, &request, &response, FLAGS_request_timeout_ms, 1); - if (ok && response.code() == 0) { - return true; + auto st = client_.SendRequestSt(&::openmldb::api::TabletServer_Stub::Put, + &request, &response, FLAGS_request_timeout_ms, 1); + if (!st.OK()) { + return st; } - LOG(WARNING) << "fail to send write request for " << response.msg() << " and error code " << response.code(); - return false; + return {response.code(), response.msg()}; } -bool TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& value) { +base::Status TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& value) { ::openmldb::api::PutRequest request; auto dim = request.add_dimensions(); dim->set_key(pk); @@ -247,14 +246,12 @@ bool TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64 request.set_tid(tid); request.set_pid(pid); ::openmldb::api::PutResponse response; - - bool ok = - client_.SendRequest(&::openmldb::api::TabletServer_Stub::Put, &request, &response, FLAGS_request_timeout_ms, 1); - if (ok && response.code() == 0) { - return true; + auto st = client_.SendRequestSt(&::openmldb::api::TabletServer_Stub::Put, + &request, &response, FLAGS_request_timeout_ms, 1); + if (!st.OK()) { + return st; } - LOG(WARNING) << "fail to put for error " << response.msg(); - return false; + return {response.code(), response.msg()}; } bool TabletClient::MakeSnapshot(uint32_t tid, uint32_t pid, uint64_t offset, std::shared_ptr task_info) { diff --git a/src/client/tablet_client.h b/src/client/tablet_client.h index c09c3c4532e..b4866b77618 100644 --- a/src/client/tablet_client.h +++ b/src/client/tablet_client.h @@ -72,13 +72,13 @@ class TabletClient : public Client { std::shared_ptr<::openmldb::sdk::SQLRequestRowBatch>, brpc::Controller* cntl, ::openmldb::api::SQLBatchRequestQueryResponse* response, const bool is_debug = false); - bool Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& value); + base::Status Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& value); - bool Put(uint32_t tid, uint32_t pid, uint64_t time, const std::string& value, + base::Status Put(uint32_t tid, uint32_t pid, uint64_t time, const std::string& value, const std::vector>& dimensions, int memory_usage_limit = 0); - bool Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value, + base::Status Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value, ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions, int memory_usage_limit = 0); diff --git a/src/cmd/openmldb.cc b/src/cmd/openmldb.cc index b4d12210cdf..8d0d9b692f5 100644 --- a/src/cmd/openmldb.cc +++ b/src/cmd/openmldb.cc @@ -304,7 +304,7 @@ int PutData(uint32_t tid, const std::mapPut(tid, pid, ts, value, iter->second)) { + if (!clients[endpoint]->Put(tid, pid, ts, value, iter->second).OK()) { printf("put failed. tid %u pid %u endpoint %s ts %lu \n", tid, pid, endpoint.c_str(), ts); return -1; } diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc index d9ce3aff439..2bfb675bf3c 100644 --- a/src/nameserver/name_server_impl.cc +++ b/src/nameserver/name_server_impl.cc @@ -9908,7 +9908,7 @@ base::Status NameServerImpl::InitGlobalVarTable() { uint64_t cur_ts = ::baidu::common::timer::get_micros() / 1000; std::string endpoint = table_info->table_partition(0).partition_meta(meta_idx).endpoint(); auto table_ptr = GetTablet(endpoint); - if (!table_ptr->client_->Put(tid, pid, cur_ts, row, dimensions)) { + if (!table_ptr->client_->Put(tid, pid, cur_ts, row, dimensions).OK()) { return {ReturnCode::kPutFailed, "fail to make a put request to table"}; } break; diff --git a/src/replica/snapshot_replica_test.cc b/src/replica/snapshot_replica_test.cc index 05e9a9d01da..64b8c2565ff 100644 --- a/src/replica/snapshot_replica_test.cc +++ b/src/replica/snapshot_replica_test.cc @@ -130,7 +130,7 @@ TEST_P(SnapshotReplicaTest, LeaderAndFollower) { ASSERT_TRUE(status.OK()); uint64_t cur_time = ::baidu::common::timer::get_micros() / 1000; auto ret = client.Put(tid, pid, "testkey", cur_time, ::openmldb::test::EncodeKV("testkey", "value1")); - ASSERT_TRUE(ret); + ASSERT_TRUE(ret.OK()); uint32_t count = 0; while (count < 10) { @@ -185,7 +185,7 @@ TEST_P(SnapshotReplicaTest, LeaderAndFollower) { ASSERT_EQ(0, srp.code()); ret = client.Put(tid, pid, "newkey", cur_time, ::openmldb::test::EncodeKV("newkey", "value2")); - ASSERT_TRUE(ret); + ASSERT_TRUE(ret.OK()); sleep(2); sr.set_pk("newkey"); tablet1->Scan(NULL, &sr, &srp, &closure); @@ -240,7 +240,7 @@ TEST_P(SnapshotReplicaTest, SendSnapshot) { ASSERT_TRUE(status.OK()); uint64_t cur_time = ::baidu::common::timer::get_micros() / 1000; auto ret = client.Put(tid, pid, "testkey", cur_time, ::openmldb::test::EncodeKV("testkey", "value1")); - ASSERT_TRUE(ret); + ASSERT_TRUE(ret.OK()); uint32_t count = 0; while (count < 10) { @@ -351,7 +351,7 @@ TEST_P(SnapshotReplicaTest, IncompleteSnapshot) { 16, 0, ::openmldb::type::CompressType::kNoCompress, storage_mode)); ASSERT_TRUE(status.OK()); auto ret = client.Put(tid, pid, "testkey", cur_time, ::openmldb::test::EncodeKV("testkey", "value1")); - ASSERT_TRUE(ret); + ASSERT_TRUE(ret.OK()); uint32_t count = 0; while (count < 10) { @@ -420,7 +420,7 @@ TEST_P(SnapshotReplicaTest, IncompleteSnapshot) { ASSERT_EQ(0, srp.code()); std::string key = "test2"; - ASSERT_TRUE(client.Put(tid, pid, key, cur_time, ::openmldb::test::EncodeKV(key, key))); + ASSERT_TRUE(client.Put(tid, pid, key, cur_time, ::openmldb::test::EncodeKV(key, key)).OK()); sr.set_tid(tid); sr.set_pid(pid); @@ -583,7 +583,7 @@ TEST_P(SnapshotReplicaTest, LeaderAndFollowerTS) { std::vector row = {"card0", "mcc0", "1.3", std::to_string(cur_time), std::to_string(cur_time - 100)}; std::string value; sdk_codec.EncodeRow(row, &value); - ASSERT_TRUE(client.Put(tid, pid, cur_time, value, dimensions)); + ASSERT_TRUE(client.Put(tid, pid, cur_time, value, dimensions).OK()); sleep(3); ::openmldb::test::TempPath temp_path; diff --git a/src/sdk/mini_cluster_batch_bm.cc b/src/sdk/mini_cluster_batch_bm.cc index 1b5227f3367..8dc4e9e665e 100644 --- a/src/sdk/mini_cluster_batch_bm.cc +++ b/src/sdk/mini_cluster_batch_bm.cc @@ -96,7 +96,7 @@ static void BM_SimpleQueryFunction(benchmark::State& state) { // NOLINT uint32_t tid = sdk.GetTableId(db, name); { for (int32_t i = 0; i < 1000; i++) { - ok = tablet[0]->GetClient()->Put(tid, 0, pk, ts + i, value); + tablet[0]->GetClient()->Put(tid, 0, pk, ts + i, value); } } std::string sql = "select col1, col2 + 1, col3, col4, col5 from " + name + " ;"; diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index cc64b845e92..6cea90a48cf 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -1366,11 +1366,11 @@ bool SQLClusterRouter::PutRow(uint32_t tid, const std::shared_ptr& if (client) { DLOG(INFO) << "put data to endpoint " << client->GetEndpoint() << " with dimensions size " << kv.second.size(); - bool ret = client->Put(tid, pid, cur_ts, row->GetRow(), kv.second, + auto ret = client->Put(tid, pid, cur_ts, row->GetRow(), kv.second, insert_memory_usage_limit_.load(std::memory_order_relaxed)); - if (!ret) { + if (!ret.OK()) { SET_STATUS_AND_WARN(status, StatusCode::kCmdError, - "INSERT failed, tid " + std::to_string(tid) + + "INSERT failed, tid " + std::to_string(tid) + ", error msg " + ret.GetMsg() + ". Note that data might have been partially inserted. " "You are encouraged to perform DELETE to remove any partially " "inserted data before trying INSERT again."); @@ -1483,11 +1483,11 @@ bool SQLClusterRouter::ExecuteInsert(const std::string& db, const std::string& n if (client) { DLOG(INFO) << "put data to endpoint " << client->GetEndpoint() << " with dimensions size " << kv.second.size(); - bool ret = client->Put(tid, pid, cur_ts, row_value, &kv.second, + auto ret = client->Put(tid, pid, cur_ts, row_value, &kv.second, insert_memory_usage_limit_.load(std::memory_order_relaxed)); - if (!ret) { + if (!ret.OK()) { SET_STATUS_AND_WARN(status, StatusCode::kCmdError, - "INSERT failed, tid " + std::to_string(tid) + + "INSERT failed, tid " + std::to_string(tid) + ", error msg " + ret.GetMsg() + ". Note that data might have been partially inserted. " "You are encouraged to perform DELETE to remove any partially " "inserted data before trying INSERT again."); diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index 829b5f7593f..c313bf6be72 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -445,8 +445,12 @@ bool TabletImpl::CheckGetDone(::openmldb::api::GetType type, uint64_t ts, uint64 void TabletImpl::UpdateMemoryUsage() { base::SysInfo info; if (auto status = base::GetSysMem(&info); status.OK()) { - system_memory_usage_rate_.store(info.mem_used * 100 / info.mem_total, std::memory_order_relaxed); - DEBUGLOG("system_memory_usage_rate is %u", system_memory_usage_rate_.load(std::memory_order_relaxed)); + if (info.mem_total > 0) { + system_memory_usage_rate_.store(info.mem_used * 100 / info.mem_total, std::memory_order_relaxed); + DEBUGLOG("system_memory_usage_rate is %u", system_memory_usage_rate_.load(std::memory_order_relaxed)); + } else { + PDLOG(WARNING, "total memory is zero"); + } } else { PDLOG(WARNING, "GetSysMem run failed. error message %s", status.GetMsg().c_str()); } From 69abfd63cffee1f456249c4a807a90746ec2541d Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 5 Dec 2023 16:15:55 +0800 Subject: [PATCH 09/10] fix: fix comment --- src/base/sys_info.h | 41 +++++++++++++++++++++++++++---------- src/base/sys_info_test.cc | 9 ++++++-- src/client/tablet_client.cc | 3 ++- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/src/base/sys_info.h b/src/base/sys_info.h index 6c8e72167b0..b1b7b6e010b 100644 --- a/src/base/sys_info.h +++ b/src/base/sys_info.h @@ -26,12 +26,17 @@ namespace openmldb::base { constexpr const char* MEM_TOTAL = "MemTotal"; -constexpr const char* MEM_AVAILABLE = "MemAvailable"; +constexpr const char* MEM_BUFFERS = "Buffers"; +constexpr const char* MEM_CACHED = "Cached"; +constexpr const char* MEM_FREE = "MemFree"; +constexpr const char* SLAB = "Slab"; struct SysInfo { - uint64_t mem_total = 0; // unit is kB - uint64_t mem_used = 0; // unit is kB - uint64_t mem_available = 0; // unit is kB + uint64_t mem_total = 0; // unit is kB + uint64_t mem_used = 0; // unit is kB + uint64_t mem_free = 0; // unit is kB + uint64_t mem_buffers = 0; // unit is kB + uint64_t mem_cached = 0; // unit is kB }; base::Status GetSysMem(SysInfo* info) { @@ -51,6 +56,7 @@ base::Status GetSysMem(SysInfo* info) { return {}; }; int parse_cnt = 0; + uint64_t slab = 0; while (fgets(line, sizeof(line), fd)) { absl::string_view str_view(line); str_view = absl::StripAsciiWhitespace(str_view); @@ -59,20 +65,33 @@ base::Status GetSysMem(SysInfo* info) { return status; } parse_cnt++; - } else if (absl::StartsWith(str_view, MEM_AVAILABLE)) { - if (auto status = parse(str_view, MEM_AVAILABLE, &info->mem_available); !status.OK()) { + } else if (absl::StartsWith(str_view, MEM_BUFFERS)) { + if (auto status = parse(str_view, MEM_BUFFERS, &info->mem_buffers); !status.OK()) { + return status; + } + parse_cnt++; + } else if (absl::StartsWith(str_view, MEM_CACHED)) { + if (auto status = parse(str_view, MEM_CACHED, &info->mem_cached); !status.OK()) { + return status; + } + parse_cnt++; + } else if (absl::StartsWith(str_view, MEM_FREE)) { + if (auto status = parse(str_view, MEM_FREE, &info->mem_free); !status.OK()) { + return status; + } + parse_cnt++; + } else if (absl::StartsWith(str_view, SLAB)) { + if (auto status = parse(str_view, SLAB, &slab); !status.OK()) { return status; } parse_cnt++; - } - if (parse_cnt >= 2) { - break; } } - if (parse_cnt != 2) { + if (parse_cnt != 5) { return {ReturnCode::kError, "fail to parse meminfo"}; } - info->mem_used = info->mem_total - info->mem_available; + info->mem_cached += slab; + info->mem_used = info->mem_total - info->mem_buffers - info->mem_cached - info->mem_free; fclose(fd); #endif return {}; diff --git a/src/base/sys_info_test.cc b/src/base/sys_info_test.cc index 45fd05c0572..4d5f5cc03c8 100644 --- a/src/base/sys_info_test.cc +++ b/src/base/sys_info_test.cc @@ -32,8 +32,13 @@ TEST_F(SystemInfoTest, GetMemory) { ASSERT_TRUE(status.OK()); ASSERT_GT(info.mem_total, 0); ASSERT_GT(info.mem_used, 0); - ASSERT_GT(info.mem_available, 0); - ASSERT_EQ(info.mem_total, info.mem_used + info.mem_available); + ASSERT_GT(info.mem_free, 0); + ASSERT_EQ(info.mem_total, info.mem_used + info.mem_buffers + info.mem_free + info.mem_cached); + /*printf("total:%lu\n", info.mem_total); + printf("used:%lu\n", info.mem_used); + printf("free:%lu\n", info.mem_free); + printf("buffers:%lu\n", info.mem_buffers); + printf("cached:%lu\n", info.mem_cached);*/ } } // namespace base diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index afaa5cf395a..54b2a8c9cec 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -236,7 +236,8 @@ base::Status TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const return {response.code(), response.msg()}; } -base::Status TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& value) { +base::Status TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, + const std::string& value) { ::openmldb::api::PutRequest request; auto dim = request.add_dimensions(); dim->set_key(pk); From e880a2830957573e00badf7fe15c81e8ba2bb830 Mon Sep 17 00:00:00 2001 From: dl239 Date: Fri, 8 Dec 2023 16:39:53 +0800 Subject: [PATCH 10/10] fix: fix comment --- src/base/sys_info.h | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/base/sys_info.h b/src/base/sys_info.h index b1b7b6e010b..4b61b5e22d4 100644 --- a/src/base/sys_info.h +++ b/src/base/sys_info.h @@ -29,7 +29,12 @@ constexpr const char* MEM_TOTAL = "MemTotal"; constexpr const char* MEM_BUFFERS = "Buffers"; constexpr const char* MEM_CACHED = "Cached"; constexpr const char* MEM_FREE = "MemFree"; -constexpr const char* SLAB = "Slab"; +constexpr const char* SRECLAIMABLE = "SReclaimable"; + +/* We calculate MemAvailable as follows + * MemAvailable = MemFree + Buffers + Cached + SReclaimable + * refer https://www.kernel.org/doc/Documentation/filesystems/proc.txt + * */ struct SysInfo { uint64_t mem_total = 0; // unit is kB @@ -56,7 +61,7 @@ base::Status GetSysMem(SysInfo* info) { return {}; }; int parse_cnt = 0; - uint64_t slab = 0; + uint64_t s_reclaimable = 0; while (fgets(line, sizeof(line), fd)) { absl::string_view str_view(line); str_view = absl::StripAsciiWhitespace(str_view); @@ -80,8 +85,8 @@ base::Status GetSysMem(SysInfo* info) { return status; } parse_cnt++; - } else if (absl::StartsWith(str_view, SLAB)) { - if (auto status = parse(str_view, SLAB, &slab); !status.OK()) { + } else if (absl::StartsWith(str_view, SRECLAIMABLE)) { + if (auto status = parse(str_view, SRECLAIMABLE, &s_reclaimable); !status.OK()) { return status; } parse_cnt++; @@ -90,7 +95,7 @@ base::Status GetSysMem(SysInfo* info) { if (parse_cnt != 5) { return {ReturnCode::kError, "fail to parse meminfo"}; } - info->mem_cached += slab; + info->mem_cached += s_reclaimable; info->mem_used = info->mem_total - info->mem_buffers - info->mem_cached - info->mem_free; fclose(fd); #endif