Skip to content

Commit

Permalink
Support set spark_config and use to request taskmanager
Browse files Browse the repository at this point in the history
  • Loading branch information
tobegit3hub committed Nov 21, 2023
1 parent 72f752b commit 7c342dc
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
61 changes: 60 additions & 1 deletion src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
#include <algorithm>
#include <fstream>
#include <future>
#include <iostream>
#include <memory>
#include <string>
#include <sstream>
#include <unordered_map>
#include <utility>

Expand Down Expand Up @@ -319,6 +321,7 @@ bool SQLClusterRouter::Init() {
session_variables_.emplace("enable_trace", "false");
session_variables_.emplace("sync_job", "false");
session_variables_.emplace("job_timeout", "60000"); // rpc request timeout for taskmanager
session_variables_.emplace("spark_config", "");
}
return true;
}
Expand Down Expand Up @@ -2980,7 +2983,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteOfflineQuery(
bool is_sync_job, int job_timeout,
::hybridse::sdk::Status* status) {
RET_IF_NULL_AND_WARN(status, "output status is nullptr");
std::map<std::string, std::string> config;
std::map<std::string, std::string> config = ParseSparkConfigString(GetSparkConfig());

Check warning on line 2986 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L2986

Added line #L2986 was not covered by tests
ReadSparkConfFromFile(std::dynamic_pointer_cast<SQLRouterOptions>(options_)->spark_conf_path, &config);

if (is_sync_job) {
Expand Down Expand Up @@ -3049,6 +3052,16 @@ int SQLClusterRouter::GetJobTimeout() {
return 60000;
}

std::string SQLClusterRouter::GetSparkConfig() {
std::lock_guard<::openmldb::base::SpinMutex> lock(mu_);
auto it = session_variables_.find("spark_config");
if (it != session_variables_.end()) {
return it->second;

Check warning on line 3059 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L3055-L3059

Added lines #L3055 - L3059 were not covered by tests
}

return "";

Check warning on line 3062 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L3062

Added line #L3062 was not covered by tests
}

::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNode* node) {
std::string key = node->Key();
std::transform(key.begin(), key.end(), key.begin(), ::tolower);
Expand Down Expand Up @@ -3083,13 +3096,31 @@ ::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNod
if (!absl::SimpleAtoi(value, &new_timeout)) {
return {StatusCode::kCmdError, "Fail to parse value, can't set the request timeout"};
}
} else if (key == "spark_config") {
if(!CheckSparkConfigString(value)) {

Check warning on line 3100 in src/sdk/sql_cluster_router.cc

View workflow job for this annotation

GitHub Actions / cpplint

[cpplint] reported by reviewdog 🐶 Missing space before ( in if( [whitespace/parens] [5] Raw Output: src/sdk/sql_cluster_router.cc:3100: Missing space before ( in if( [whitespace/parens] [5]
return {StatusCode::kCmdError, "Fail to parse spark config, set like 'spark.executor.memory=2g;spark.executor.cores=2'"};

Check warning on line 3101 in src/sdk/sql_cluster_router.cc

View workflow job for this annotation

GitHub Actions / cpplint

[cpplint] reported by reviewdog 🐶 Lines should be <= 120 characters long [whitespace/line_length] [2] Raw Output: src/sdk/sql_cluster_router.cc:3101: Lines should be <= 120 characters long [whitespace/line_length] [2]

Check warning on line 3101 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L3099-L3101

Added lines #L3099 - L3101 were not covered by tests
}
} else {
return {};
}
session_variables_[key] = value;
return {};
}

bool SQLClusterRouter::CheckSparkConfigString(const std::string& input) {
std::istringstream iss(input);
std::string keyValue;

Check warning on line 3112 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L3110-L3112

Added lines #L3110 - L3112 were not covered by tests

while (std::getline(iss, keyValue, ';')) {

Check warning on line 3114 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L3114

Added line #L3114 was not covered by tests
// Check if the substring starts with "spark."
if (keyValue.find("spark.") != 0) {
return false;

Check warning on line 3117 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L3116-L3117

Added lines #L3116 - L3117 were not covered by tests
}
}

return true;

Check warning on line 3121 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L3121

Added line #L3121 was not covered by tests
}

::hybridse::sdk::Status SQLClusterRouter::ParseNamesFromArgs(const std::string& db,
const std::vector<std::string>& args, std::string* db_name, std::string* name) {
if (args.size() == 1) {
Expand Down Expand Up @@ -4523,6 +4554,34 @@ bool SQLClusterRouter::CheckTableStatus(const std::string& db, const std::string
return check_succeed;
}

std::map<std::string, std::string> SQLClusterRouter::ParseSparkConfigString(const std::string& input) {
std::map<std::string, std::string> configMap;

Check warning on line 4558 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L4557-L4558

Added lines #L4557 - L4558 were not covered by tests

std::istringstream iss(input);
std::string keyValue;

Check warning on line 4561 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L4560-L4561

Added lines #L4560 - L4561 were not covered by tests

while (std::getline(iss, keyValue, ';')) {

Check warning on line 4563 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L4563

Added line #L4563 was not covered by tests
// Split the key-value pair
size_t equalPos = keyValue.find('=');
if (equalPos != std::string::npos) {
std::string key = keyValue.substr(0, equalPos);
std::string value = keyValue.substr(equalPos + 1);

Check warning on line 4568 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L4565-L4568

Added lines #L4565 - L4568 were not covered by tests

// Check if the key starts with "spark."
if (key.find("spark.") == 0) {

Check warning on line 4571 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L4571

Added line #L4571 was not covered by tests
// Add to the map
configMap[key] = value;

Check warning on line 4573 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L4573

Added line #L4573 was not covered by tests
} else {
std::cerr << "Error: Key does not start with 'spark.' - " << key << std::endl;

Check warning on line 4575 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L4575

Added line #L4575 was not covered by tests
}
} else {
std::cerr << "Error: Invalid key-value pair - " << keyValue << std::endl;

Check warning on line 4578 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L4578

Added line #L4578 was not covered by tests
}
}

return configMap;

Check warning on line 4582 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L4582

Added line #L4582 was not covered by tests
}

void SQLClusterRouter::ReadSparkConfFromFile(std::string conf_file_path, std::map<std::string, std::string>* config) {
if (!conf_file_path.empty()) {
boost::property_tree::ptree pt;
Expand Down
6 changes: 6 additions & 0 deletions src/sdk/sql_cluster_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ class SQLClusterRouter : public SQLRouter {
// get job timeout from the session variables, we will use the timeout when sending requests to the taskmanager
int GetJobTimeout();

std::string GetSparkConfig();

std::map<std::string, std::string> ParseSparkConfigString(const std::string& input);

Check warning on line 289 in src/sdk/sql_cluster_router.h

View workflow job for this annotation

GitHub Actions / cpplint

[cpplint] reported by reviewdog 🐶 Line ends in whitespace. Consider deleting these extra spaces. [whitespace/end_of_line] [4] Raw Output: src/sdk/sql_cluster_router.h:289: Line ends in whitespace. Consider deleting these extra spaces. [whitespace/end_of_line] [4]
bool CheckSparkConfigString(const std::string& input);

::openmldb::base::Status ExecuteOfflineQueryAsync(const std::string& sql,
const std::map<std::string, std::string>& config,
const std::string& default_db, int job_timeout,
Expand Down

0 comments on commit 7c342dc

Please sign in to comment.