diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 4a135f9dd39..4babfd8f55e 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -30,7 +30,7 @@ on: jobs: coverage: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 env: CTEST_PARALLEL_LEVEL: 1 # parallel test level for ctest (make test) CMAKE_BUILD_TYPE: Debug diff --git a/src/sdk/db_sdk.cc b/src/sdk/db_sdk.cc index 39f791e9602..27d56901027 100644 --- a/src/sdk/db_sdk.cc +++ b/src/sdk/db_sdk.cc @@ -194,12 +194,19 @@ ClusterSDK::~ClusterSDK() { } void ClusterSDK::CheckZk() { - if (session_id_ == 0) { - WatchNotify(); - } else if (session_id_ != zk_client_->GetSessionTerm()) { - LOG(WARNING) << "session changed, re-watch notify"; - WatchNotify(); + // ensure that zk client is alive + if (zk_client_->EnsureConnected()) { + if (session_id_ == 0) { + WatchNotify(); + } else if (session_id_ != zk_client_->GetSessionTerm()) { + LOG(WARNING) << "session changed, re-watch notify"; + WatchNotify(); + } + } else { + // 5min print once + LOG_EVERY_N(WARNING, 150) << "zk client is not connected, reconnect later"; } + pool_.DelayTask(2000, [this] { CheckZk(); }); } @@ -380,7 +387,7 @@ bool ClusterSDK::InitTabletClient() { std::vector tablets; bool ok = zk_client_->GetNodes(tablets); if (!ok) { - LOG(WARNING) << "fail to get tablet"; + LOG(WARNING) << "fail to get tablets from zk"; return false; } std::map real_ep_map; diff --git a/src/sdk/db_sdk.h b/src/sdk/db_sdk.h index 48bb1ea80ab..88a9fd8a62d 100644 --- a/src/sdk/db_sdk.h +++ b/src/sdk/db_sdk.h @@ -170,7 +170,8 @@ class ClusterSDK : public DBSDK { std::string globalvar_changed_notify_path_; std::string leader_path_; std::string taskmanager_leader_path_; - + // CheckZk will be called periodically, so we don't need to check zk_client_ before using it + // if failed, just retry ::openmldb::zk::ZkClient* zk_client_; ::baidu::common::ThreadPool pool_; }; diff --git a/src/zk/zk_client.cc b/src/zk/zk_client.cc index e1044db73ec..8bbd265071d 100644 --- a/src/zk/zk_client.cc +++ b/src/zk/zk_client.cc @@ -55,7 +55,7 @@ void NodeWatcher(zhandle_t* zh, int type, int state, const char* path, void* wat } void ItemWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcher_ctx) { - PDLOG(INFO, "node watcher with event type %d, state %d", type, state); + PDLOG(INFO, "item watcher with event type %d, state %d", type, state); if (zoo_get_context(zh)) { ZkClient* client = const_cast(reinterpret_cast(zoo_get_context(zh))); std::string path_str(path); @@ -565,8 +565,13 @@ void ZkClient::LogEvent(int type, int state, const char* path) { if (type == ZOO_SESSION_EVENT) { if (state == ZOO_CONNECTED_STATE) { Connected(); + } else if (state == ZOO_CONNECTING_STATE || state == ZOO_ASSOCIATING_STATE) { + // just wait } else if (state == ZOO_EXPIRED_SESSION_STATE) { connected_ = false; + } else { + // unknow state, should retry + connected_ = false; } } } @@ -612,5 +617,18 @@ bool ZkClient::Mkdir(const std::string& path) { return MkdirNoLock(path); } +bool ZkClient::EnsureConnected() { + if (!IsConnected()) { + LOG(WARNING) << "reconnect zk"; + if (Reconnect()) { + LOG(INFO) << "reconnect zk ok"; + } else { + LOG(WARNING) << "reconnect zk failed"; + return false; + } + } + return true; +} + } // namespace zk } // namespace openmldb diff --git a/src/zk/zk_client.h b/src/zk/zk_client.h index e06c0de7e6a..19f86d3f631 100644 --- a/src/zk/zk_client.h +++ b/src/zk/zk_client.h @@ -136,6 +136,11 @@ class ZkClient { // when reconnect, need Register and Watchnodes again bool Reconnect(); + // ensure that zk client is connected: + // if not, try to reconnect, return false if reconnect failed + // DON'T use zk client if this function return false + bool EnsureConnected(); + private: void Connected();