diff --git a/docs/en/openmldb_sql/dml/LOAD_DATA_STATEMENT.md b/docs/en/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
index e10a91e98be..d8998fbf3ce 100644
--- a/docs/en/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
+++ b/docs/en/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
@@ -65,6 +65,7 @@ The following table introduces the parameters of `LOAD DATA INFILE`.
- As metioned in the above table, online execution mode only supports append input mode.
- When `deep_copy=false`, OpenMLDB doesn't support to modify the data in the soft link. Therefore, if the current offline data comes from a soft link, `append` import is no longer supported. Moreover, if current connection is soft copy, using the hard copy with `overwrite` will not delete the data of the soft connection.
+- If the `insert_memory_usage_limit` session variable is set, a failure will be returned if the server memory usage exceeds the set value during online import
```
diff --git a/docs/en/reference/sql/ddl/SET_STATEMENT.md b/docs/en/reference/sql/ddl/SET_STATEMENT.md
index 6c0e83de75a..25d03370eaf 100644
--- a/docs/en/reference/sql/ddl/SET_STATEMENT.md
+++ b/docs/en/reference/sql/ddl/SET_STATEMENT.md
@@ -35,6 +35,7 @@ The following format is also equivalent.
| @@session.sync_job|@@sync_job | When the value is `true`, the offline command will be executed synchronously, waiting for the final result of the execution.
When the value is `false`, the offline command returns immediately. If you need to check the execution, please use `SHOW JOB` command. | `true`,
`false` | `false` |
| @@session.sync_timeout|@@sync_timeout | When `sync_job=true`, you can configure the waiting time for synchronization commands. The timeout will return immediately. After the timeout returns, you can still view the command execution through `SHOW JOB`. | Int | 20000 |
| @@session.spark_config|@@spark_config | Set the Spark configuration for offline jobs, configure like 'spark.executor.memory=2g;spark.executor.cores=2'. Notice that the priority of this Spark configuration is higer than TaskManager Spark configuration but lower than CLI Spark configuration file. | String | "" |
+| @@session.insert_memory_usage_limit |@@insert_memory_usage_limit | Set server memory usage limit when inserting or importing data. If the server memory usage exceeds the set value, the insertion will fail. The value range is 0-100. 0 means unlimited | Int | "0" |
## Example
diff --git a/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md b/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md
index 1b513913e10..4b63861e59f 100644
--- a/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md
+++ b/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md
@@ -36,6 +36,7 @@ sessionVariableName ::= '@@'Identifier | '@@session.'Identifier | '@@global.'Ide
| @@session.sync_job|@@sync_job | 当该变量值为 `true`,离线的命令将变为同步,等待执行的最终结果。
当该变量值为 `false`,离线的命令即时返回,若要查看命令的执行情况,请使用`SHOW JOB`。 | "true" \| "false" | "false" |
| @@session.job_timeout|@@job_timeout | 可配置离线异步命令或离线管理命令的等待时间(以*毫秒*为单位),将立即返回。离线异步命令返回后仍可通过`SHOW JOB`查看命令执行情况。 | Int | "20000" |
| @@session.spark_config|@@spark_config | 设置离线任务的 Spark 参数,配置项参考 'spark.executor.memory=2g;spark.executor.cores=2'。注意此 Spark 配置优先级高于 TaskManager 默认 Spark 配置,低于命令行的 Spark 配置文件。 | String | "" |
+| @@session.insert_memory_usage_limit|@@insert_memory_usage_limit | 设置数据插入或者数据导入时服务端内存使用率限制。取值范围为0-100。如果服务端内存使用率超过设置的值,就会插入失败。设置为0表示不限制 | Int | "0" |
## Example
### 设置和显示会话系统变量
diff --git a/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md b/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
index 4fcc94c15fc..b3c7ffc55bf 100644
--- a/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
+++ b/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
@@ -115,6 +115,8 @@ LOAD DATA INFILE 'hive://db1.t1' INTO TABLE t1;
在线导入只允许`mode='append'`,无法`overwrite`或`error_if_exists`。
+如果设置了 `insert_memory_usage_limit` session变量,服务端内存使用率超过设定的值就会返回失败。
+
## 离线导入规则
表的离线信息可通过`desc
`查看。我们将数据地址分为两类,离线地址是OpenMLDB的内部存储路径,硬拷贝将写入此地址,仅一个;软链接地址是软链接导入的地址列表。
diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java
index 978c3cca694..7e626f623ea 100644
--- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java
+++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java
@@ -40,6 +40,7 @@ public class OpenmldbSource implements TableProvider, DataSourceRegister {
// single: insert when read one row
// batch: insert when commit(after read a whole partition)
private String writerType = "single";
+ private int insertMemoryUsageLimit = 0;
@Override
public StructType inferSchema(CaseInsensitiveStringMap options) {
@@ -71,12 +72,15 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
writerType = options.get("writerType");
}
+ if (options.containsKey("insert_memory_usage_limit")) {
+ insertMemoryUsageLimit = Integer.parseInt(options.get("insert_memory_usage_limit"));
+ }
return null;
}
@Override
public Table getTable(StructType schema, Transform[] partitioning, Map properties) {
- return new OpenmldbTable(dbName, tableName, option, writerType);
+ return new OpenmldbTable(dbName, tableName, option, writerType, insertMemoryUsageLimit);
}
@Override
diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java
index 0cf98b7d19e..481a9cc1f4c 100644
--- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java
+++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java
@@ -49,15 +49,17 @@ public class OpenmldbTable implements SupportsWrite, SupportsRead {
private final String tableName;
private final SdkOption option;
private final String writerType;
+ private final int insertMemoryUsageLimit;
private SqlExecutor executor = null;
private Set capabilities;
- public OpenmldbTable(String dbName, String tableName, SdkOption option, String writerType) {
+ public OpenmldbTable(String dbName, String tableName, SdkOption option, String writerType, int insertMemoryUsageLimit) {
this.dbName = dbName;
this.tableName = tableName;
this.option = option;
this.writerType = writerType;
+ this.insertMemoryUsageLimit = insertMemoryUsageLimit;
try {
this.executor = new SqlClusterExecutor(option);
// no need to check table exists, schema() will check it later
@@ -69,7 +71,7 @@ public OpenmldbTable(String dbName, String tableName, SdkOption option, String w
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
- OpenmldbWriteConfig config = new OpenmldbWriteConfig(dbName, tableName, option, writerType);
+ OpenmldbWriteConfig config = new OpenmldbWriteConfig(dbName, tableName, option, writerType, insertMemoryUsageLimit);
return new OpenmldbWriteBuilder(config, info);
}
diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java
index 2885aaba70e..843fb9a8da7 100644
--- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java
+++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java
@@ -48,6 +48,7 @@ public OpenmldbDataSingleWriter(OpenmldbWriteConfig config, int partitionId, lon
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
+ executor.executeSQL(dbName, "SET @@insert_memory_usage_limit=" + config.insertMemoryUsageLimit);
Schema schema = executor.getTableSchema(dbName, tableName);
// create insert placeholder
diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java
index 5da75e99348..e9ba0e30c5a 100644
--- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java
+++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java
@@ -48,6 +48,7 @@ public OpenmldbDataWriter(OpenmldbWriteConfig config, int partitionId, long task
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
+ executor.executeSQL(dbName, "SET @@insert_memory_usage_limit=" + config.insertMemoryUsageLimit);
Schema schema = executor.getTableSchema(dbName, tableName);
// create insert placeholder
diff --git a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java
index 89c2d801ca5..80007b14ae5 100644
--- a/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java
+++ b/java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java
@@ -24,13 +24,15 @@
// Must serializable
public class OpenmldbWriteConfig implements Serializable {
public final String dbName, tableName, zkCluster, zkPath, writerType;
+ public final int insertMemoryUsageLimit;
- public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType) {
+ public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType, int insertMemoryUsageLimit) {
this.dbName = dbName;
this.tableName = tableName;
this.zkCluster = option.getZkCluster();
this.zkPath = option.getZkPath();
this.writerType = writerType;
+ this.insertMemoryUsageLimit = insertMemoryUsageLimit;
// TODO(hw): other configs in SdkOption
}
}
diff --git a/src/apiserver/api_server_impl.h b/src/apiserver/api_server_impl.h
index ee41e34935b..fc8e8022417 100644
--- a/src/apiserver/api_server_impl.h
+++ b/src/apiserver/api_server_impl.h
@@ -27,15 +27,13 @@
#include "absl/status/status.h"
#include "apiserver/interface_provider.h"
#include "apiserver/json_helper.h"
-#include "rapidjson/document.h" // raw rapidjson 1.1.0, not in butil
+#include "bvar/bvar.h"
+#include "bvar/multi_dimension.h" // latency recorder
#include "proto/api_server.pb.h"
+#include "rapidjson/document.h" // raw rapidjson 1.1.0, not in butil
#include "sdk/sql_cluster_router.h"
#include "sdk/sql_request_row.h"
-#include "absl/status/status.h"
-#include "bvar/bvar.h"
-#include "bvar/multi_dimension.h" // latency recorder
-
namespace openmldb {
namespace apiserver {
diff --git a/src/base/status.h b/src/base/status.h
index 5995138edd6..8ac134b18bd 100644
--- a/src/base/status.h
+++ b/src/base/status.h
@@ -96,6 +96,7 @@ enum ReturnCode {
kInvalidArgs = 161,
kCheckIndexFailed = 162,
kCatalogUpdateFailed = 163,
+ kExceedPutMemoryLimit = 164,
kNameserverIsNotLeader = 300,
kAutoFailoverIsEnabled = 301,
kEndpointIsNotExist = 302,
diff --git a/src/base/sys_info.h b/src/base/sys_info.h
new file mode 100644
index 00000000000..4b61b5e22d4
--- /dev/null
+++ b/src/base/sys_info.h
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2021 4Paradigm
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_BASE_SYS_INFO_H_
+#define SRC_BASE_SYS_INFO_H_
+
+#include "absl/strings/ascii.h"
+#include "absl/strings/match.h"
+#include "absl/strings/numbers.h"
+#include "absl/strings/string_view.h"
+#include "base/status.h"
+
+namespace openmldb::base {
+
+constexpr const char* MEM_TOTAL = "MemTotal";
+constexpr const char* MEM_BUFFERS = "Buffers";
+constexpr const char* MEM_CACHED = "Cached";
+constexpr const char* MEM_FREE = "MemFree";
+constexpr const char* SRECLAIMABLE = "SReclaimable";
+
+/* We calculate MemAvailable as follows
+ * MemAvailable = MemFree + Buffers + Cached + SReclaimable
+ * refer https://www.kernel.org/doc/Documentation/filesystems/proc.txt
+ * */
+
+struct SysInfo {
+ uint64_t mem_total = 0; // unit is kB
+ uint64_t mem_used = 0; // unit is kB
+ uint64_t mem_free = 0; // unit is kB
+ uint64_t mem_buffers = 0; // unit is kB
+ uint64_t mem_cached = 0; // unit is kB
+};
+
+base::Status GetSysMem(SysInfo* info) {
+#if defined(__linux__)
+ FILE *fd = fopen("/proc/meminfo", "r");
+ if (fd == nullptr) {
+ return {ReturnCode::kError, "fail to open meminfo file"};
+ }
+ char line[256];
+ auto parse = [](absl::string_view str, absl::string_view key, uint64_t* val) -> base::Status {
+ str.remove_prefix(key.size() + 1);
+ str.remove_suffix(2);
+ str = absl::StripAsciiWhitespace(str);
+ if (!absl::SimpleAtoi(str, val)) {
+ return {ReturnCode::kError, absl::StrCat("fail to parse ", key)};
+ }
+ return {};
+ };
+ int parse_cnt = 0;
+ uint64_t s_reclaimable = 0;
+ while (fgets(line, sizeof(line), fd)) {
+ absl::string_view str_view(line);
+ str_view = absl::StripAsciiWhitespace(str_view);
+ if (absl::StartsWith(str_view, MEM_TOTAL)) {
+ if (auto status = parse(str_view, MEM_TOTAL, &info->mem_total); !status.OK()) {
+ return status;
+ }
+ parse_cnt++;
+ } else if (absl::StartsWith(str_view, MEM_BUFFERS)) {
+ if (auto status = parse(str_view, MEM_BUFFERS, &info->mem_buffers); !status.OK()) {
+ return status;
+ }
+ parse_cnt++;
+ } else if (absl::StartsWith(str_view, MEM_CACHED)) {
+ if (auto status = parse(str_view, MEM_CACHED, &info->mem_cached); !status.OK()) {
+ return status;
+ }
+ parse_cnt++;
+ } else if (absl::StartsWith(str_view, MEM_FREE)) {
+ if (auto status = parse(str_view, MEM_FREE, &info->mem_free); !status.OK()) {
+ return status;
+ }
+ parse_cnt++;
+ } else if (absl::StartsWith(str_view, SRECLAIMABLE)) {
+ if (auto status = parse(str_view, SRECLAIMABLE, &s_reclaimable); !status.OK()) {
+ return status;
+ }
+ parse_cnt++;
+ }
+ }
+ if (parse_cnt != 5) {
+ return {ReturnCode::kError, "fail to parse meminfo"};
+ }
+ info->mem_cached += s_reclaimable;
+ info->mem_used = info->mem_total - info->mem_buffers - info->mem_cached - info->mem_free;
+ fclose(fd);
+#endif
+ return {};
+}
+
+} // namespace openmldb::base
+
+#endif // SRC_BASE_SYS_INFO_H_
diff --git a/src/base/sys_info_test.cc b/src/base/sys_info_test.cc
new file mode 100644
index 00000000000..4d5f5cc03c8
--- /dev/null
+++ b/src/base/sys_info_test.cc
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2021 4Paradigm
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "gtest/gtest.h"
+#include "base/sys_info.h"
+
+namespace openmldb {
+namespace base {
+
+class SystemInfoTest : public ::testing::Test {
+ public:
+ SystemInfoTest() {}
+ ~SystemInfoTest() {}
+};
+
+TEST_F(SystemInfoTest, GetMemory) {
+ base::SysInfo info;
+ auto status = base::GetSysMem(&info);
+ ASSERT_TRUE(status.OK());
+ ASSERT_GT(info.mem_total, 0);
+ ASSERT_GT(info.mem_used, 0);
+ ASSERT_GT(info.mem_free, 0);
+ ASSERT_EQ(info.mem_total, info.mem_used + info.mem_buffers + info.mem_free + info.mem_cached);
+ /*printf("total:%lu\n", info.mem_total);
+ printf("used:%lu\n", info.mem_used);
+ printf("free:%lu\n", info.mem_free);
+ printf("buffers:%lu\n", info.mem_buffers);
+ printf("cached:%lu\n", info.mem_cached);*/
+}
+
+} // namespace base
+} // namespace openmldb
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc
index 878d2a5f3cc..54b2a8c9cec 100644
--- a/src/client/tablet_client.cc
+++ b/src/client/tablet_client.cc
@@ -201,36 +201,43 @@ bool TabletClient::UpdateTableMetaForAddField(uint32_t tid, const std::vector>& dimensions) {
+base::Status TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const std::string& value,
+ const std::vector>& dimensions,
+ int memory_usage_limit) {
::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension> pb_dimensions;
for (size_t i = 0; i < dimensions.size(); i++) {
::openmldb::api::Dimension* d = pb_dimensions.Add();
d->set_key(dimensions[i].first);
d->set_idx(dimensions[i].second);
}
- return Put(tid, pid, time, base::Slice(value), &pb_dimensions);
+ return Put(tid, pid, time, base::Slice(value), &pb_dimensions, memory_usage_limit);
}
-bool TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value,
- ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions) {
+base::Status TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value,
+ ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions,
+ int memory_usage_limit) {
::openmldb::api::PutRequest request;
+ if (memory_usage_limit < 0 || memory_usage_limit > 100) {
+ return {base::ReturnCode::kError, absl::StrCat("invalid memory_usage_limit ", memory_usage_limit)};
+ } else if (memory_usage_limit > 0) {
+ request.set_memory_limit(memory_usage_limit);
+ }
request.set_time(time);
request.set_value(value.data(), value.size());
request.set_tid(tid);
request.set_pid(pid);
request.mutable_dimensions()->Swap(dimensions);
::openmldb::api::PutResponse response;
- bool ok =
- client_.SendRequest(&::openmldb::api::TabletServer_Stub::Put, &request, &response, FLAGS_request_timeout_ms, 1);
- if (ok && response.code() == 0) {
- return true;
+ auto st = client_.SendRequestSt(&::openmldb::api::TabletServer_Stub::Put,
+ &request, &response, FLAGS_request_timeout_ms, 1);
+ if (!st.OK()) {
+ return st;
}
- LOG(WARNING) << "fail to send write request for " << response.msg() << " and error code " << response.code();
- return false;
+ return {response.code(), response.msg()};
}
-bool TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& value) {
+base::Status TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time,
+ const std::string& value) {
::openmldb::api::PutRequest request;
auto dim = request.add_dimensions();
dim->set_key(pk);
@@ -240,14 +247,12 @@ bool TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64
request.set_tid(tid);
request.set_pid(pid);
::openmldb::api::PutResponse response;
-
- bool ok =
- client_.SendRequest(&::openmldb::api::TabletServer_Stub::Put, &request, &response, FLAGS_request_timeout_ms, 1);
- if (ok && response.code() == 0) {
- return true;
+ auto st = client_.SendRequestSt(&::openmldb::api::TabletServer_Stub::Put,
+ &request, &response, FLAGS_request_timeout_ms, 1);
+ if (!st.OK()) {
+ return st;
}
- LOG(WARNING) << "fail to put for error " << response.msg();
- return false;
+ return {response.code(), response.msg()};
}
bool TabletClient::MakeSnapshot(uint32_t tid, uint32_t pid, uint64_t offset, std::shared_ptr task_info) {
diff --git a/src/client/tablet_client.h b/src/client/tablet_client.h
index 9fee8e08392..b4866b77618 100644
--- a/src/client/tablet_client.h
+++ b/src/client/tablet_client.h
@@ -72,18 +72,19 @@ class TabletClient : public Client {
std::shared_ptr<::openmldb::sdk::SQLRequestRowBatch>, brpc::Controller* cntl,
::openmldb::api::SQLBatchRequestQueryResponse* response, const bool is_debug = false);
- bool Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& value);
+ base::Status Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& value);
- bool Put(uint32_t tid, uint32_t pid, uint64_t time, const std::string& value,
- const std::vector>& dimensions);
+ base::Status Put(uint32_t tid, uint32_t pid, uint64_t time, const std::string& value,
+ const std::vector>& dimensions,
+ int memory_usage_limit = 0);
- bool Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value,
- ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions);
+ base::Status Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value,
+ ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions,
+ int memory_usage_limit = 0);
bool Get(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, std::string& value, // NOLINT
uint64_t& ts, // NOLINT
- std::string& msg);
- ; // NOLINT
+ std::string& msg); // NOLINT
bool Get(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& idx_name,
std::string& value, // NOLINT
diff --git a/src/cmd/openmldb.cc b/src/cmd/openmldb.cc
index b4d12210cdf..8d0d9b692f5 100644
--- a/src/cmd/openmldb.cc
+++ b/src/cmd/openmldb.cc
@@ -304,7 +304,7 @@ int PutData(uint32_t tid, const std::mapPut(tid, pid, ts, value, iter->second)) {
+ if (!clients[endpoint]->Put(tid, pid, ts, value, iter->second).OK()) {
printf("put failed. tid %u pid %u endpoint %s ts %lu \n", tid, pid, endpoint.c_str(), ts);
return -1;
}
diff --git a/src/datacollector/data_collector.cc b/src/datacollector/data_collector.cc
index 1af941226cf..30d10b714f9 100644
--- a/src/datacollector/data_collector.cc
+++ b/src/datacollector/data_collector.cc
@@ -259,7 +259,8 @@ void DataCollectorImpl::CreateTaskEnv(const datasync::AddSyncTaskRequest* reques
auto tablet_client = tablet_client_map_[tablet_endpoint];
api::TableStatus table_status;
if (auto st = tablet_client->GetTableStatus(tid, pid, table_status); !st.OK()) {
- SET_RESP_AND_WARN(response, -1, "get table status from tablet server failed, maybe table doesn't exist: " + st.GetMsg());
+ SET_RESP_AND_WARN(response, -1, "get table status from tablet server failed, maybe table doesn't exist: "
+ + st.GetMsg());
return;
}
if (!ValidateTableStatus(table_status)) {
diff --git a/src/datacollector/data_collector_test.cc b/src/datacollector/data_collector_test.cc
index d08cd4b71b8..cbd11b41fdd 100644
--- a/src/datacollector/data_collector_test.cc
+++ b/src/datacollector/data_collector_test.cc
@@ -14,12 +14,12 @@
* limitations under the License.
*/
-#include "datacollector/data_collector.h"
+#include
+#include "codec/sdk_codec.h"
+#include "datacollector/data_collector.h"
#include "gflags/gflags.h"
#include "gtest/gtest.h"
-
-#include "codec/sdk_codec.h"
#include "vm/engine.h"
using openmldb::client::TabletClient;
diff --git a/src/flags.cc b/src/flags.cc
index 42e085781eb..3366d04accd 100644
--- a/src/flags.cc
+++ b/src/flags.cc
@@ -34,6 +34,7 @@ DEFINE_string(zk_auth_schema, "digest", "config the id of authentication schema"
DEFINE_string(zk_cert, "", "config the application credentials");
DEFINE_string(tablet, "", "config the endpoint of tablet");
DEFINE_string(nameserver, "", "config the endpoint of nameserver");
+DEFINE_int32(get_sys_mem_interval, 10000, "config the interval of get system memory. unit is milliseconds");
DEFINE_int32(zk_keep_alive_check_interval, 15000, "config the interval of keep alive check. unit is milliseconds");
DEFINE_uint32(zk_log_level, 0,
"CLI: set level integer, DISABLE_LOGGING=0, "
diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc
index d9ce3aff439..2bfb675bf3c 100644
--- a/src/nameserver/name_server_impl.cc
+++ b/src/nameserver/name_server_impl.cc
@@ -9908,7 +9908,7 @@ base::Status NameServerImpl::InitGlobalVarTable() {
uint64_t cur_ts = ::baidu::common::timer::get_micros() / 1000;
std::string endpoint = table_info->table_partition(0).partition_meta(meta_idx).endpoint();
auto table_ptr = GetTablet(endpoint);
- if (!table_ptr->client_->Put(tid, pid, cur_ts, row, dimensions)) {
+ if (!table_ptr->client_->Put(tid, pid, cur_ts, row, dimensions).OK()) {
return {ReturnCode::kPutFailed, "fail to make a put request to table"};
}
break;
diff --git a/src/proto/name_server.proto b/src/proto/name_server.proto
index 219b83a0b73..e605692d3fe 100755
--- a/src/proto/name_server.proto
+++ b/src/proto/name_server.proto
@@ -98,7 +98,7 @@ message TableInfo {
repeated openmldb.common.ColumnDesc column_desc = 9;
repeated openmldb.common.ColumnKey column_key = 10;
repeated openmldb.common.ColumnDesc added_column_desc = 11;
- optional uint32 format_version = 12 [default = 1];
+ optional uint32 format_version = 12 [default = 1, deprecated = true];
optional string db = 13 [default = ""];
repeated string partition_key = 14;
repeated common.VersionPair schema_versions = 15;
diff --git a/src/proto/tablet.proto b/src/proto/tablet.proto
index ee0ec5beae1..2c7a038960b 100755
--- a/src/proto/tablet.proto
+++ b/src/proto/tablet.proto
@@ -193,7 +193,8 @@ message PutRequest {
optional uint32 pid = 5;
repeated Dimension dimensions = 6;
repeated TSDimension ts_dimensions = 7 [deprecated = true];
- optional uint32 format_version = 8 [default = 0];
+ optional uint32 format_version = 8 [default = 0, deprecated = true];
+ optional uint32 memory_limit = 9;
}
message PutResponse {
@@ -324,7 +325,7 @@ message TableMeta {
repeated openmldb.common.ColumnKey column_key = 11;
repeated openmldb.common.ColumnDesc added_column_desc = 12;
// format_version 0 , the legacy format 1 ,the new one
- optional uint32 format_version = 13 [default = 0];
+ optional uint32 format_version = 13 [default = 0, deprecated = true];
optional string db = 14 [default = ""];
repeated common.VersionPair schema_versions = 15;
repeated common.TablePartition table_partition = 16;
diff --git a/src/replica/snapshot_replica_test.cc b/src/replica/snapshot_replica_test.cc
index 05e9a9d01da..64b8c2565ff 100644
--- a/src/replica/snapshot_replica_test.cc
+++ b/src/replica/snapshot_replica_test.cc
@@ -130,7 +130,7 @@ TEST_P(SnapshotReplicaTest, LeaderAndFollower) {
ASSERT_TRUE(status.OK());
uint64_t cur_time = ::baidu::common::timer::get_micros() / 1000;
auto ret = client.Put(tid, pid, "testkey", cur_time, ::openmldb::test::EncodeKV("testkey", "value1"));
- ASSERT_TRUE(ret);
+ ASSERT_TRUE(ret.OK());
uint32_t count = 0;
while (count < 10) {
@@ -185,7 +185,7 @@ TEST_P(SnapshotReplicaTest, LeaderAndFollower) {
ASSERT_EQ(0, srp.code());
ret = client.Put(tid, pid, "newkey", cur_time, ::openmldb::test::EncodeKV("newkey", "value2"));
- ASSERT_TRUE(ret);
+ ASSERT_TRUE(ret.OK());
sleep(2);
sr.set_pk("newkey");
tablet1->Scan(NULL, &sr, &srp, &closure);
@@ -240,7 +240,7 @@ TEST_P(SnapshotReplicaTest, SendSnapshot) {
ASSERT_TRUE(status.OK());
uint64_t cur_time = ::baidu::common::timer::get_micros() / 1000;
auto ret = client.Put(tid, pid, "testkey", cur_time, ::openmldb::test::EncodeKV("testkey", "value1"));
- ASSERT_TRUE(ret);
+ ASSERT_TRUE(ret.OK());
uint32_t count = 0;
while (count < 10) {
@@ -351,7 +351,7 @@ TEST_P(SnapshotReplicaTest, IncompleteSnapshot) {
16, 0, ::openmldb::type::CompressType::kNoCompress, storage_mode));
ASSERT_TRUE(status.OK());
auto ret = client.Put(tid, pid, "testkey", cur_time, ::openmldb::test::EncodeKV("testkey", "value1"));
- ASSERT_TRUE(ret);
+ ASSERT_TRUE(ret.OK());
uint32_t count = 0;
while (count < 10) {
@@ -420,7 +420,7 @@ TEST_P(SnapshotReplicaTest, IncompleteSnapshot) {
ASSERT_EQ(0, srp.code());
std::string key = "test2";
- ASSERT_TRUE(client.Put(tid, pid, key, cur_time, ::openmldb::test::EncodeKV(key, key)));
+ ASSERT_TRUE(client.Put(tid, pid, key, cur_time, ::openmldb::test::EncodeKV(key, key)).OK());
sr.set_tid(tid);
sr.set_pid(pid);
@@ -583,7 +583,7 @@ TEST_P(SnapshotReplicaTest, LeaderAndFollowerTS) {
std::vector row = {"card0", "mcc0", "1.3", std::to_string(cur_time), std::to_string(cur_time - 100)};
std::string value;
sdk_codec.EncodeRow(row, &value);
- ASSERT_TRUE(client.Put(tid, pid, cur_time, value, dimensions));
+ ASSERT_TRUE(client.Put(tid, pid, cur_time, value, dimensions).OK());
sleep(3);
::openmldb::test::TempPath temp_path;
diff --git a/src/sdk/mini_cluster_batch_bm.cc b/src/sdk/mini_cluster_batch_bm.cc
index 1b5227f3367..8dc4e9e665e 100644
--- a/src/sdk/mini_cluster_batch_bm.cc
+++ b/src/sdk/mini_cluster_batch_bm.cc
@@ -96,7 +96,7 @@ static void BM_SimpleQueryFunction(benchmark::State& state) { // NOLINT
uint32_t tid = sdk.GetTableId(db, name);
{
for (int32_t i = 0; i < 1000; i++) {
- ok = tablet[0]->GetClient()->Put(tid, 0, pk, ts + i, value);
+ tablet[0]->GetClient()->Put(tid, 0, pk, ts + i, value);
}
}
std::string sql = "select col1, col2 + 1, col3, col4, col5 from " + name + " ;";
diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc
index 0e172322921..804e8502fa6 100644
--- a/src/sdk/sql_cluster_router.cc
+++ b/src/sdk/sql_cluster_router.cc
@@ -320,6 +320,7 @@ bool SQLClusterRouter::Init() {
session_variables_.emplace("enable_trace", "false");
session_variables_.emplace("sync_job", "false");
session_variables_.emplace("job_timeout", "60000"); // rpc request timeout for taskmanager
+ session_variables_.emplace("insert_memory_usage_limit", "0");
session_variables_.emplace("spark_config", "");
}
return true;
@@ -1366,8 +1367,9 @@ bool SQLClusterRouter::PutRow(uint32_t tid, const std::shared_ptr&
if (client) {
DLOG(INFO) << "put data to endpoint " << client->GetEndpoint() << " with dimensions size "
<< kv.second.size();
- bool ret = client->Put(tid, pid, cur_ts, row->GetRow(), kv.second);
- if (!ret) {
+ auto ret = client->Put(tid, pid, cur_ts, row->GetRow(), kv.second,
+ insert_memory_usage_limit_.load(std::memory_order_relaxed));
+ if (!ret.OK()) {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError,
"INSERT failed, tid " + std::to_string(tid) +
". Note that data might have been partially inserted. "
@@ -1482,8 +1484,9 @@ bool SQLClusterRouter::ExecuteInsert(const std::string& db, const std::string& n
if (client) {
DLOG(INFO) << "put data to endpoint " << client->GetEndpoint() << " with dimensions size "
<< kv.second.size();
- bool ret = client->Put(tid, pid, cur_ts, row_value, &kv.second);
- if (!ret) {
+ auto ret = client->Put(tid, pid, cur_ts, row_value, &kv.second,
+ insert_memory_usage_limit_.load(std::memory_order_relaxed));
+ if (!ret.OK()) {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError,
"INSERT failed, tid " + std::to_string(tid) +
". Note that data might have been partially inserted. "
@@ -2859,6 +2862,8 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL(
::openmldb::base::Status base_status;
if (is_online_mode) {
// Handle in online mode
+ config.emplace("insert_memory_usage_limit",
+ std::to_string(insert_memory_usage_limit_.load(std::memory_order_relaxed)));
base_status = ImportOnlineData(sql, config, database, is_sync_job, offline_job_timeout, &job_info);
} else {
// Handle in offline mode
@@ -3112,6 +3117,15 @@ ::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNod
if (!absl::SimpleAtoi(value, &new_timeout)) {
return {StatusCode::kCmdError, "Fail to parse value, can't set the request timeout"};
}
+ } else if (key == "insert_memory_usage_limit") {
+ int limit = 0;
+ if (!absl::SimpleAtoi(value, &limit)) {
+ return {StatusCode::kCmdError, "Fail to parse value, can't set the insert_memory_usage_limit"};
+ }
+ if (limit < 0 || limit > 100) {
+ return {StatusCode::kCmdError, "Invalid value! The value must be between 0 and 100"};
+ }
+ insert_memory_usage_limit_.store(limit, std::memory_order_relaxed);
} else if (key == "spark_config") {
if (!CheckSparkConfigString(value)) {
return {StatusCode::kCmdError,
diff --git a/src/sdk/sql_cluster_router.h b/src/sdk/sql_cluster_router.h
index 53d2389b575..a4e1126b542 100644
--- a/src/sdk/sql_cluster_router.h
+++ b/src/sdk/sql_cluster_router.h
@@ -434,6 +434,7 @@ class SQLClusterRouter : public SQLRouter {
input_lru_cache_;
::openmldb::base::SpinMutex mu_;
::openmldb::base::Random rand_;
+ std::atomic insert_memory_usage_limit_ = 0; // [0-100], the default value 0 means unlimited
};
class Bias {
diff --git a/src/storage/table_test.cc b/src/storage/table_test.cc
index 8e9b7b09ca8..251e92986c6 100644
--- a/src/storage/table_test.cc
+++ b/src/storage/table_test.cc
@@ -1924,7 +1924,6 @@ TEST_P(TableTest, NegativeTs) {
table_meta.set_seg_cnt(8);
table_meta.set_mode(::openmldb::api::TableMode::kTableLeader);
table_meta.set_key_entry_max_height(8);
- table_meta.set_format_version(1);
table_meta.set_storage_mode(storageMode);
SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "card", ::openmldb::type::kString);
SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "ts1", ::openmldb::type::kBigInt);
diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc
index 2c506be510f..c313bf6be72 100644
--- a/src/tablet/tablet_impl.cc
+++ b/src/tablet/tablet_impl.cc
@@ -43,6 +43,7 @@
#include "base/proto_util.h"
#include "base/status.h"
#include "base/strings.h"
+#include "base/sys_info.h"
#include "brpc/controller.h"
#include "butil/iobuf.h"
#include "codec/codec.h"
@@ -77,6 +78,7 @@ DECLARE_uint32(scan_max_bytes_size);
DECLARE_uint32(scan_reserve_size);
DECLARE_uint32(max_memory_mb);
DECLARE_double(mem_release_rate);
+DECLARE_int32(get_sys_mem_interval);
DECLARE_string(db_root_path);
DECLARE_string(ssd_root_path);
DECLARE_string(hdd_root_path);
@@ -135,7 +137,7 @@ TabletImpl::TabletImpl()
replicators_(),
snapshots_(),
zk_client_(nullptr),
- keep_alive_pool_(1),
+ trivial_task_pool_(1),
task_pool_(FLAGS_task_pool_size),
io_pool_(FLAGS_io_pool_size),
snapshot_pool_(FLAGS_snapshot_pool_size),
@@ -154,7 +156,7 @@ TabletImpl::TabletImpl()
TabletImpl::~TabletImpl() {
task_pool_.Stop(true);
- keep_alive_pool_.Stop(true);
+ trivial_task_pool_.Stop(true);
gc_pool_.Stop(true);
io_pool_.Stop(true);
snapshot_pool_.Stop(true);
@@ -315,6 +317,9 @@ bool TabletImpl::Init(const std::string& zk_cluster, const std::string& zk_path,
#ifdef TCMALLOC_ENABLE
MallocExtension* tcmalloc = MallocExtension::instance();
tcmalloc->SetMemoryReleaseRate(FLAGS_mem_release_rate);
+#endif
+#if defined(__linux__)
+ trivial_task_pool_.DelayTask(FLAGS_get_sys_mem_interval, boost::bind(&TabletImpl::UpdateMemoryUsage, this));
#endif
return true;
}
@@ -402,7 +407,7 @@ bool TabletImpl::RegisterZK() {
LOG(WARNING) << "add notify watcher failed";
return false;
}
- keep_alive_pool_.DelayTask(FLAGS_zk_keep_alive_check_interval, boost::bind(&TabletImpl::CheckZkClient, this));
+ trivial_task_pool_.DelayTask(FLAGS_zk_keep_alive_check_interval, boost::bind(&TabletImpl::CheckZkClient, this));
}
return true;
}
@@ -437,6 +442,21 @@ bool TabletImpl::CheckGetDone(::openmldb::api::GetType type, uint64_t ts, uint64
return false;
}
+void TabletImpl::UpdateMemoryUsage() {
+ base::SysInfo info;
+ if (auto status = base::GetSysMem(&info); status.OK()) {
+ if (info.mem_total > 0) {
+ system_memory_usage_rate_.store(info.mem_used * 100 / info.mem_total, std::memory_order_relaxed);
+ DEBUGLOG("system_memory_usage_rate is %u", system_memory_usage_rate_.load(std::memory_order_relaxed));
+ } else {
+ PDLOG(WARNING, "total memory is zero");
+ }
+ } else {
+ PDLOG(WARNING, "GetSysMem run failed. error message %s", status.GetMsg().c_str());
+ }
+ trivial_task_pool_.DelayTask(FLAGS_get_sys_mem_interval, boost::bind(&TabletImpl::UpdateMemoryUsage, this));
+}
+
int32_t TabletImpl::GetIndex(const ::openmldb::api::GetRequest* request, const ::openmldb::api::TableMeta& meta,
const std::map>& vers_schema, CombineIterator* it,
std::string* value, uint64_t* ts) {
@@ -714,13 +734,22 @@ void TabletImpl::Put(RpcController* controller, const ::openmldb::api::PutReques
response->set_msg("table is loading");
return;
}
- if (table->GetStorageMode() == ::openmldb::common::StorageMode::kMemory &&
- memory_used_.load(std::memory_order_relaxed) > FLAGS_max_memory_mb) {
- PDLOG(WARNING, "current memory %lu MB exceed max memory limit %lu MB. tid %u, pid %u",
- memory_used_.load(std::memory_order_relaxed), FLAGS_max_memory_mb, tid, pid);
- response->set_code(::openmldb::base::ReturnCode::kExceedMaxMemory);
- response->set_msg("exceed max memory");
- return;
+ if (table->GetStorageMode() == ::openmldb::common::StorageMode::kMemory) {
+ if (memory_used_.load(std::memory_order_relaxed) > FLAGS_max_memory_mb) {
+ PDLOG(WARNING, "current memory %lu MB exceed max memory limit %lu MB. tid %u, pid %u",
+ memory_used_.load(std::memory_order_relaxed), FLAGS_max_memory_mb, tid, pid);
+ response->set_code(base::ReturnCode::kExceedMaxMemory);
+ response->set_msg("exceed max memory");
+ return;
+ }
+ if (request->has_memory_limit() && request->memory_limit() > 0
+ && system_memory_usage_rate_.load(std::memory_order_relaxed) > request->memory_limit()) {
+ PDLOG(WARNING, "current system_memory_usage_rate %u exceed request memory limit %u. tid %u, pid %u",
+ system_memory_usage_rate_.load(std::memory_order_relaxed), request->memory_limit(), tid, pid);
+ response->set_code(base::ReturnCode::kExceedPutMemoryLimit);
+ response->set_msg("exceed memory limit");
+ return;
+ }
}
::openmldb::api::LogEntry entry;
entry.set_pk(request->pk());
@@ -2968,7 +2997,8 @@ void TabletImpl::LoadTable(RpcController* controller, const ::openmldb::api::Loa
std::string db_path = GetDBPath(root_path, tid, pid);
if (!::openmldb::base::IsExists(db_path)) {
- PDLOG(WARNING, "table db path does not exist, but still load. tid %u, pid %u, path %s", tid, pid, db_path.c_str());
+ PDLOG(WARNING, "table db path does not exist, but still load. tid %u, pid %u, path %s",
+ tid, pid, db_path.c_str());
}
std::shared_ptr table = GetTable(tid, pid);
@@ -4265,7 +4295,7 @@ void TabletImpl::CheckZkClient() {
PDLOG(INFO, "registe zk ok");
}
}
- keep_alive_pool_.DelayTask(FLAGS_zk_keep_alive_check_interval, boost::bind(&TabletImpl::CheckZkClient, this));
+ trivial_task_pool_.DelayTask(FLAGS_zk_keep_alive_check_interval, boost::bind(&TabletImpl::CheckZkClient, this));
}
}
diff --git a/src/tablet/tablet_impl.h b/src/tablet/tablet_impl.h
index c24a253c9e9..9d28012ef5c 100644
--- a/src/tablet/tablet_impl.h
+++ b/src/tablet/tablet_impl.h
@@ -435,6 +435,8 @@ class TabletImpl : public ::openmldb::api::TabletServer {
// refresh the pre-aggr tables info
bool RefreshAggrCatalog();
+ void UpdateMemoryUsage();
+
private:
Tables tables_;
std::mutex mu_;
@@ -444,7 +446,7 @@ class TabletImpl : public ::openmldb::api::TabletServer {
Snapshots snapshots_;
Aggregators aggregators_;
ZkClient* zk_client_;
- ThreadPool keep_alive_pool_;
+ ThreadPool trivial_task_pool_;
ThreadPool task_pool_;
ThreadPool io_pool_;
ThreadPool snapshot_pool_;
@@ -474,6 +476,7 @@ class TabletImpl : public ::openmldb::api::TabletServer {
std::unique_ptr deploy_collector_;
std::atomic memory_used_ = 0;
+ std::atomic system_memory_usage_rate_ = 0; // [0, 100]
};
} // namespace tablet