Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(v068-patch): zk reconnect in db sdk #3655

Merged
merged 4 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ on:

jobs:
coverage:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
env:
CTEST_PARALLEL_LEVEL: 1 # parallel test level for ctest (make test)
CMAKE_BUILD_TYPE: Debug
Expand Down
19 changes: 13 additions & 6 deletions src/sdk/db_sdk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,19 @@ ClusterSDK::~ClusterSDK() {
}

void ClusterSDK::CheckZk() {
if (session_id_ == 0) {
WatchNotify();
} else if (session_id_ != zk_client_->GetSessionTerm()) {
LOG(WARNING) << "session changed, re-watch notify";
WatchNotify();
// ensure that zk client is alive
if (zk_client_->EnsureConnected()) {
if (session_id_ == 0) {
WatchNotify();
} else if (session_id_ != zk_client_->GetSessionTerm()) {
LOG(WARNING) << "session changed, re-watch notify";
WatchNotify();
}
} else {
// 5min print once
LOG_EVERY_N(WARNING, 150) << "zk client is not connected, reconnect later";
}

pool_.DelayTask(2000, [this] { CheckZk(); });
}

Expand Down Expand Up @@ -380,7 +387,7 @@ bool ClusterSDK::InitTabletClient() {
std::vector<std::string> tablets;
bool ok = zk_client_->GetNodes(tablets);
if (!ok) {
LOG(WARNING) << "fail to get tablet";
LOG(WARNING) << "fail to get tablets from zk";
return false;
}
std::map<std::string, std::string> real_ep_map;
Expand Down
3 changes: 2 additions & 1 deletion src/sdk/db_sdk.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ class ClusterSDK : public DBSDK {
std::string globalvar_changed_notify_path_;
std::string leader_path_;
std::string taskmanager_leader_path_;

// CheckZk will be called periodically, so we don't need to check zk_client_ before using it
// if failed, just retry
::openmldb::zk::ZkClient* zk_client_;
::baidu::common::ThreadPool pool_;
};
Expand Down
20 changes: 19 additions & 1 deletion src/zk/zk_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void NodeWatcher(zhandle_t* zh, int type, int state, const char* path, void* wat
}

void ItemWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcher_ctx) {
PDLOG(INFO, "node watcher with event type %d, state %d", type, state);
PDLOG(INFO, "item watcher with event type %d, state %d", type, state);
if (zoo_get_context(zh)) {
ZkClient* client = const_cast<ZkClient*>(reinterpret_cast<const ZkClient*>(zoo_get_context(zh)));
std::string path_str(path);
Expand Down Expand Up @@ -565,8 +565,13 @@ void ZkClient::LogEvent(int type, int state, const char* path) {
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_CONNECTED_STATE) {
Connected();
} else if (state == ZOO_CONNECTING_STATE || state == ZOO_ASSOCIATING_STATE) {
// just wait
} else if (state == ZOO_EXPIRED_SESSION_STATE) {
connected_ = false;
} else {
// unknow state, should retry
connected_ = false;
}
}
}
Expand Down Expand Up @@ -612,5 +617,18 @@ bool ZkClient::Mkdir(const std::string& path) {
return MkdirNoLock(path);
}

bool ZkClient::EnsureConnected() {
if (!IsConnected()) {
LOG(WARNING) << "reconnect zk";
if (Reconnect()) {
LOG(INFO) << "reconnect zk ok";
} else {
LOG(WARNING) << "reconnect zk failed";
return false;
}
}
return true;
}

} // namespace zk
} // namespace openmldb
5 changes: 5 additions & 0 deletions src/zk/zk_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ class ZkClient {
// when reconnect, need Register and Watchnodes again
bool Reconnect();

// ensure that zk client is connected:
// if not, try to reconnect, return false if reconnect failed
// DON'T use zk client if this function return false
bool EnsureConnected();

private:
void Connected();

Expand Down
Loading