Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: put returns failure if the server memory usage exceeds the specified ratio #3631

Merged
merged 13 commits into from
Jan 15, 2024
1 change: 1 addition & 0 deletions docs/en/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The following table introduces the parameters of `LOAD DATA INFILE`.
- As metioned in the above table, online execution mode only supports append input mode.

- When `deep_copy=false`, OpenMLDB doesn't support to modify the data in the soft link. Therefore, if the current offline data comes from a soft link, `append` import is no longer supported. Moreover, if current connection is soft copy, using the hard copy with `overwrite` will not delete the data of the soft connection.
- If the `insert_memory_usage_limit` session variable is set, a failure will be returned if the server memory usage exceeds the set value during online import

```

Expand Down
1 change: 1 addition & 0 deletions docs/en/reference/sql/ddl/SET_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The following format is also equivalent.
| @@session.sync_job|@@sync_job | When the value is `true`, the offline command will be executed synchronously, waiting for the final result of the execution.<br />When the value is `false`, the offline command returns immediately. If you need to check the execution, please use `SHOW JOB` command. | `true`, <br /> `false` | `false` |
| @@session.sync_timeout|@@sync_timeout | When `sync_job=true`, you can configure the waiting time for synchronization commands. The timeout will return immediately. After the timeout returns, you can still view the command execution through `SHOW JOB`. | Int | 20000 |
| @@session.spark_config|@@spark_config | Set the Spark configuration for offline jobs, configure like 'spark.executor.memory=2g;spark.executor.cores=2'. Notice that the priority of this Spark configuration is higer than TaskManager Spark configuration but lower than CLI Spark configuration file. | String | "" |
| @@session.insert_memory_usage_limit |@@insert_memory_usage_limit | Set server memory usage limit when inserting or importing data. If the server memory usage exceeds the set value, the insertion will fail. The value range is 0-100. 0 means unlimited | Int | "0" |

## Example

Expand Down
1 change: 1 addition & 0 deletions docs/zh/openmldb_sql/ddl/SET_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ sessionVariableName ::= '@@'Identifier | '@@session.'Identifier | '@@global.'Ide
| @@session.sync_job|@@sync_job | 当该变量值为 `true`,离线的命令将变为同步,等待执行的最终结果。<br />当该变量值为 `false`,离线的命令即时返回,若要查看命令的执行情况,请使用`SHOW JOB`。 | "true" \| "false" | "false" |
| @@session.job_timeout|@@job_timeout | 可配置离线异步命令或离线管理命令的等待时间(以*毫秒*为单位),将立即返回。离线异步命令返回后仍可通过`SHOW JOB`查看命令执行情况。 | Int | "20000" |
| @@session.spark_config|@@spark_config | 设置离线任务的 Spark 参数,配置项参考 'spark.executor.memory=2g;spark.executor.cores=2'。注意此 Spark 配置优先级高于 TaskManager 默认 Spark 配置,低于命令行的 Spark 配置文件。 | String | "" |
| @@session.insert_memory_usage_limit|@@insert_memory_usage_limit | 设置数据插入或者数据导入时服务端内存使用率限制。取值范围为0-100。如果服务端内存使用率超过设置的值,就会插入失败。设置为0表示不限制 | Int | "0" |
## Example

### 设置和显示会话系统变量
Expand Down
2 changes: 2 additions & 0 deletions docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ LOAD DATA INFILE 'hive://db1.t1' INTO TABLE t1;

在线导入只允许`mode='append'`,无法`overwrite`或`error_if_exists`。

如果设置了 `insert_memory_usage_limit` session变量,服务端内存使用率超过设定的值就会返回失败。

## 离线导入规则

表的离线信息可通过`desc <table>`查看。我们将数据地址分为两类,离线地址是OpenMLDB的内部存储路径,硬拷贝将写入此地址,仅一个;软链接地址是软链接导入的地址列表。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
// single: insert when read one row
// batch: insert when commit(after read a whole partition)
private String writerType = "single";
private int insertMemoryUsageLimit = 0;

Check warning on line 43 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java#L43

Added line #L43 was not covered by tests

@Override
public StructType inferSchema(CaseInsensitiveStringMap options) {
Expand Down Expand Up @@ -71,12 +72,15 @@
writerType = options.get("writerType");
}

if (options.containsKey("insert_memory_usage_limit")) {
insertMemoryUsageLimit = Integer.parseInt(options.get("insert_memory_usage_limit"));

Check warning on line 76 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java#L76

Added line #L76 was not covered by tests
}
return null;
}

@Override
public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) {
return new OpenmldbTable(dbName, tableName, option, writerType);
return new OpenmldbTable(dbName, tableName, option, writerType, insertMemoryUsageLimit);

Check warning on line 83 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbSource.java#L83

Added line #L83 was not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@
private final String tableName;
private final SdkOption option;
private final String writerType;
private final int insertMemoryUsageLimit;
private SqlExecutor executor = null;

private Set<TableCapability> capabilities;

public OpenmldbTable(String dbName, String tableName, SdkOption option, String writerType) {
public OpenmldbTable(String dbName, String tableName, SdkOption option, String writerType, int insertMemoryUsageLimit) {

Check warning on line 57 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java#L57

Added line #L57 was not covered by tests
this.dbName = dbName;
this.tableName = tableName;
this.option = option;
this.writerType = writerType;
this.insertMemoryUsageLimit = insertMemoryUsageLimit;

Check warning on line 62 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java#L62

Added line #L62 was not covered by tests
try {
this.executor = new SqlClusterExecutor(option);
// no need to check table exists, schema() will check it later
Expand All @@ -69,7 +71,7 @@

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
OpenmldbWriteConfig config = new OpenmldbWriteConfig(dbName, tableName, option, writerType);
OpenmldbWriteConfig config = new OpenmldbWriteConfig(dbName, tableName, option, writerType, insertMemoryUsageLimit);

Check warning on line 74 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/OpenmldbTable.java#L74

Added line #L74 was not covered by tests
return new OpenmldbWriteBuilder(config, info);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
executor.executeSQL(dbName, "SET @@insert_memory_usage_limit=" + config.insertMemoryUsageLimit);

Check warning on line 51 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java#L51

Added line #L51 was not covered by tests

Schema schema = executor.getTableSchema(dbName, tableName);
// create insert placeholder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
executor.executeSQL(dbName, "SET @@insert_memory_usage_limit=" + config.insertMemoryUsageLimit);

Check warning on line 51 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java#L51

Added line #L51 was not covered by tests

Schema schema = executor.getTableSchema(dbName, tableName);
// create insert placeholder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
// Must serializable
public class OpenmldbWriteConfig implements Serializable {
public final String dbName, tableName, zkCluster, zkPath, writerType;
public final int insertMemoryUsageLimit;

public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType) {
public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType, int insertMemoryUsageLimit) {

Check warning on line 29 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java#L29

Added line #L29 was not covered by tests
this.dbName = dbName;
this.tableName = tableName;
this.zkCluster = option.getZkCluster();
this.zkPath = option.getZkPath();
this.writerType = writerType;
this.insertMemoryUsageLimit = insertMemoryUsageLimit;

Check warning on line 35 in java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbWriteConfig.java#L35

Added line #L35 was not covered by tests
// TODO(hw): other configs in SdkOption
}
}
8 changes: 3 additions & 5 deletions src/apiserver/api_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
#include "absl/status/status.h"
#include "apiserver/interface_provider.h"
#include "apiserver/json_helper.h"
#include "rapidjson/document.h" // raw rapidjson 1.1.0, not in butil
#include "bvar/bvar.h"
#include "bvar/multi_dimension.h" // latency recorder
#include "proto/api_server.pb.h"
#include "rapidjson/document.h" // raw rapidjson 1.1.0, not in butil
#include "sdk/sql_cluster_router.h"
#include "sdk/sql_request_row.h"

#include "absl/status/status.h"
#include "bvar/bvar.h"
#include "bvar/multi_dimension.h" // latency recorder

namespace openmldb {
namespace apiserver {

Expand Down
1 change: 1 addition & 0 deletions src/base/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ enum ReturnCode {
kInvalidArgs = 161,
kCheckIndexFailed = 162,
kCatalogUpdateFailed = 163,
kExceedPutMemoryLimit = 164,
kNameserverIsNotLeader = 300,
kAutoFailoverIsEnabled = 301,
kEndpointIsNotExist = 302,
Expand Down
107 changes: 107 additions & 0 deletions src/base/sys_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2021 4Paradigm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef SRC_BASE_SYS_INFO_H_
#define SRC_BASE_SYS_INFO_H_

#include "absl/strings/ascii.h"
#include "absl/strings/match.h"
#include "absl/strings/numbers.h"
#include "absl/strings/string_view.h"
#include "base/status.h"

namespace openmldb::base {

constexpr const char* MEM_TOTAL = "MemTotal";
constexpr const char* MEM_BUFFERS = "Buffers";
constexpr const char* MEM_CACHED = "Cached";
constexpr const char* MEM_FREE = "MemFree";
constexpr const char* SRECLAIMABLE = "SReclaimable";

/* We calculate MemAvailable as follows
* MemAvailable = MemFree + Buffers + Cached + SReclaimable
* refer https://www.kernel.org/doc/Documentation/filesystems/proc.txt
* */

struct SysInfo {
uint64_t mem_total = 0; // unit is kB
uint64_t mem_used = 0; // unit is kB
uint64_t mem_free = 0; // unit is kB
uint64_t mem_buffers = 0; // unit is kB
uint64_t mem_cached = 0; // unit is kB
};

base::Status GetSysMem(SysInfo* info) {
dl239 marked this conversation as resolved.
Show resolved Hide resolved
#if defined(__linux__)
FILE *fd = fopen("/proc/meminfo", "r");
if (fd == nullptr) {
return {ReturnCode::kError, "fail to open meminfo file"};

Check warning on line 51 in src/base/sys_info.h

View check run for this annotation

Codecov / codecov/patch

src/base/sys_info.h#L51

Added line #L51 was not covered by tests
dl239 marked this conversation as resolved.
Show resolved Hide resolved
}
char line[256];
auto parse = [](absl::string_view str, absl::string_view key, uint64_t* val) -> base::Status {
str.remove_prefix(key.size() + 1);
str.remove_suffix(2);
str = absl::StripAsciiWhitespace(str);
if (!absl::SimpleAtoi(str, val)) {
return {ReturnCode::kError, absl::StrCat("fail to parse ", key)};

Check warning on line 59 in src/base/sys_info.h

View check run for this annotation

Codecov / codecov/patch

src/base/sys_info.h#L59

Added line #L59 was not covered by tests
}
return {};
};
int parse_cnt = 0;
uint64_t s_reclaimable = 0;
while (fgets(line, sizeof(line), fd)) {
absl::string_view str_view(line);
str_view = absl::StripAsciiWhitespace(str_view);
if (absl::StartsWith(str_view, MEM_TOTAL)) {
if (auto status = parse(str_view, MEM_TOTAL, &info->mem_total); !status.OK()) {
return status;

Check warning on line 70 in src/base/sys_info.h

View check run for this annotation

Codecov / codecov/patch

src/base/sys_info.h#L70

Added line #L70 was not covered by tests
}
parse_cnt++;
} else if (absl::StartsWith(str_view, MEM_BUFFERS)) {
if (auto status = parse(str_view, MEM_BUFFERS, &info->mem_buffers); !status.OK()) {
return status;

Check warning on line 75 in src/base/sys_info.h

View check run for this annotation

Codecov / codecov/patch

src/base/sys_info.h#L75

Added line #L75 was not covered by tests
}
parse_cnt++;
} else if (absl::StartsWith(str_view, MEM_CACHED)) {
if (auto status = parse(str_view, MEM_CACHED, &info->mem_cached); !status.OK()) {
return status;

Check warning on line 80 in src/base/sys_info.h

View check run for this annotation

Codecov / codecov/patch

src/base/sys_info.h#L80

Added line #L80 was not covered by tests
}
parse_cnt++;
} else if (absl::StartsWith(str_view, MEM_FREE)) {
if (auto status = parse(str_view, MEM_FREE, &info->mem_free); !status.OK()) {
return status;

Check warning on line 85 in src/base/sys_info.h

View check run for this annotation

Codecov / codecov/patch

src/base/sys_info.h#L85

Added line #L85 was not covered by tests
}
parse_cnt++;
} else if (absl::StartsWith(str_view, SRECLAIMABLE)) {
if (auto status = parse(str_view, SRECLAIMABLE, &s_reclaimable); !status.OK()) {
return status;

Check warning on line 90 in src/base/sys_info.h

View check run for this annotation

Codecov / codecov/patch

src/base/sys_info.h#L90

Added line #L90 was not covered by tests
}
parse_cnt++;
}
}
if (parse_cnt != 5) {
return {ReturnCode::kError, "fail to parse meminfo"};

Check warning on line 96 in src/base/sys_info.h

View check run for this annotation

Codecov / codecov/patch

src/base/sys_info.h#L96

Added line #L96 was not covered by tests
}
info->mem_cached += s_reclaimable;
info->mem_used = info->mem_total - info->mem_buffers - info->mem_cached - info->mem_free;
fclose(fd);
#endif
return {};
}

} // namespace openmldb::base

#endif // SRC_BASE_SYS_INFO_H_
50 changes: 50 additions & 0 deletions src/base/sys_info_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2021 4Paradigm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "gtest/gtest.h"
#include "base/sys_info.h"

namespace openmldb {
namespace base {

class SystemInfoTest : public ::testing::Test {
public:
SystemInfoTest() {}
~SystemInfoTest() {}
};

TEST_F(SystemInfoTest, GetMemory) {
base::SysInfo info;
auto status = base::GetSysMem(&info);
ASSERT_TRUE(status.OK());
ASSERT_GT(info.mem_total, 0);
ASSERT_GT(info.mem_used, 0);
ASSERT_GT(info.mem_free, 0);
ASSERT_EQ(info.mem_total, info.mem_used + info.mem_buffers + info.mem_free + info.mem_cached);
/*printf("total:%lu\n", info.mem_total);
printf("used:%lu\n", info.mem_used);
printf("free:%lu\n", info.mem_free);
printf("buffers:%lu\n", info.mem_buffers);
printf("cached:%lu\n", info.mem_cached);*/
}

} // namespace base
} // namespace openmldb

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
43 changes: 24 additions & 19 deletions src/client/tablet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,36 +201,43 @@
return false;
}

bool TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const std::string& value,
const std::vector<std::pair<std::string, uint32_t>>& dimensions) {
base::Status TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const std::string& value,
const std::vector<std::pair<std::string, uint32_t>>& dimensions,
int memory_usage_limit) {
dl239 marked this conversation as resolved.
Show resolved Hide resolved
::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension> pb_dimensions;
for (size_t i = 0; i < dimensions.size(); i++) {
::openmldb::api::Dimension* d = pb_dimensions.Add();
d->set_key(dimensions[i].first);
d->set_idx(dimensions[i].second);
}
return Put(tid, pid, time, base::Slice(value), &pb_dimensions);
return Put(tid, pid, time, base::Slice(value), &pb_dimensions, memory_usage_limit);
}

bool TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value,
::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions) {
base::Status TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value,
::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions,
int memory_usage_limit) {
::openmldb::api::PutRequest request;
if (memory_usage_limit < 0 || memory_usage_limit > 100) {
return {base::ReturnCode::kError, absl::StrCat("invalid memory_usage_limit ", memory_usage_limit)};
} else if (memory_usage_limit > 0) {
request.set_memory_limit(memory_usage_limit);

Check warning on line 223 in src/client/tablet_client.cc

View check run for this annotation

Codecov / codecov/patch

src/client/tablet_client.cc#L223

Added line #L223 was not covered by tests
}
request.set_time(time);
request.set_value(value.data(), value.size());
request.set_tid(tid);
request.set_pid(pid);
request.mutable_dimensions()->Swap(dimensions);
::openmldb::api::PutResponse response;
bool ok =
client_.SendRequest(&::openmldb::api::TabletServer_Stub::Put, &request, &response, FLAGS_request_timeout_ms, 1);
if (ok && response.code() == 0) {
return true;
auto st = client_.SendRequestSt(&::openmldb::api::TabletServer_Stub::Put,
&request, &response, FLAGS_request_timeout_ms, 1);
if (!st.OK()) {
return st;

Check warning on line 234 in src/client/tablet_client.cc

View check run for this annotation

Codecov / codecov/patch

src/client/tablet_client.cc#L234

Added line #L234 was not covered by tests
}
LOG(WARNING) << "fail to send write request for " << response.msg() << " and error code " << response.code();
return false;
return {response.code(), response.msg()};
}

bool TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& value) {
base::Status TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time,
const std::string& value) {
::openmldb::api::PutRequest request;
auto dim = request.add_dimensions();
dim->set_key(pk);
Expand All @@ -240,14 +247,12 @@
request.set_tid(tid);
request.set_pid(pid);
::openmldb::api::PutResponse response;

bool ok =
client_.SendRequest(&::openmldb::api::TabletServer_Stub::Put, &request, &response, FLAGS_request_timeout_ms, 1);
if (ok && response.code() == 0) {
return true;
auto st = client_.SendRequestSt(&::openmldb::api::TabletServer_Stub::Put,
&request, &response, FLAGS_request_timeout_ms, 1);
if (!st.OK()) {
return st;

Check warning on line 253 in src/client/tablet_client.cc

View check run for this annotation

Codecov / codecov/patch

src/client/tablet_client.cc#L253

Added line #L253 was not covered by tests
}
LOG(WARNING) << "fail to put for error " << response.msg();
return false;
return {response.code(), response.msg()};
}

bool TabletClient::MakeSnapshot(uint32_t tid, uint32_t pid, uint64_t offset, std::shared_ptr<TaskInfo> task_info) {
Expand Down
Loading
Loading