diff --git a/src/sdk/db_sdk.cc b/src/sdk/db_sdk.cc index 0f551853740..a8b08e10259 100644 --- a/src/sdk/db_sdk.cc +++ b/src/sdk/db_sdk.cc @@ -195,12 +195,19 @@ ClusterSDK::~ClusterSDK() { } void ClusterSDK::CheckZk() { - if (session_id_ == 0) { - WatchNotify(); - } else if (session_id_ != zk_client_->GetSessionTerm()) { - LOG(WARNING) << "session changed, re-watch notify"; - WatchNotify(); + // ensure that zk client is alive + if (zk_client_->EnsureConnected()) { + if (session_id_ == 0) { + WatchNotify(); + } else if (session_id_ != zk_client_->GetSessionTerm()) { + LOG(WARNING) << "session changed, re-watch notify"; + WatchNotify(); + } + } else { + // 5min print once + LOG_EVERY_N(WARNING, 150) << "zk client is not connected, reconnect later"; } + pool_.DelayTask(2000, [this] { CheckZk(); }); } @@ -383,7 +390,7 @@ bool ClusterSDK::InitTabletClient() { std::vector tablets; bool ok = zk_client_->GetNodes(tablets); if (!ok) { - LOG(WARNING) << "fail to get tablet"; + LOG(WARNING) << "fail to get tablets from zk"; return false; } std::map real_ep_map; diff --git a/src/sdk/db_sdk.h b/src/sdk/db_sdk.h index c6d2cfbab76..2d8a4ab2f38 100644 --- a/src/sdk/db_sdk.h +++ b/src/sdk/db_sdk.h @@ -174,7 +174,8 @@ class ClusterSDK : public DBSDK { std::string globalvar_changed_notify_path_; std::string leader_path_; std::string taskmanager_leader_path_; - + // CheckZk will be called periodically, so we don't need to check zk_client_ before using it + // if failed, just retry ::openmldb::zk::ZkClient* zk_client_; ::baidu::common::ThreadPool pool_; }; diff --git a/src/zk/zk_client.cc b/src/zk/zk_client.cc index ecc94c1251c..070acf8d22f 100644 --- a/src/zk/zk_client.cc +++ b/src/zk/zk_client.cc @@ -55,7 +55,7 @@ void NodeWatcher(zhandle_t* zh, int type, int state, const char* path, void* wat } void ItemWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcher_ctx) { - PDLOG(INFO, "node watcher with event type %d, state %d", type, state); + PDLOG(INFO, "item watcher with event type %d, state %d", type, state); if (zoo_get_context(zh)) { ZkClient* client = const_cast(reinterpret_cast(zoo_get_context(zh))); std::string path_str(path); @@ -583,8 +583,13 @@ void ZkClient::LogEvent(int type, int state, const char* path) { if (type == ZOO_SESSION_EVENT) { if (state == ZOO_CONNECTED_STATE) { Connected(); + } else if(state == ZOO_CONNECTING_STATE || state == ZOO_ASSOCIATING_STATE) { + //just wait } else if (state == ZOO_EXPIRED_SESSION_STATE) { connected_ = false; + } else { + // unknow state, should retry + connected_ = false; } } } @@ -630,5 +635,18 @@ bool ZkClient::Mkdir(const std::string& path) { return MkdirNoLock(path); } +bool ZkClient::EnsureConnected() { + if (!IsConnected()) { + LOG(WARNING) << "reconnect zk"; + if (Reconnect()) { + LOG(INFO) << "reconnect zk ok"; + } else { + LOG(WARNING) << "reconnect zk failed"; + return false; + } + } + return true; +} + } // namespace zk } // namespace openmldb diff --git a/src/zk/zk_client.h b/src/zk/zk_client.h index 344df5753e2..9636c963f3b 100644 --- a/src/zk/zk_client.h +++ b/src/zk/zk_client.h @@ -138,6 +138,11 @@ class ZkClient { // when reconnect, need Register and Watchnodes again bool Reconnect(); + // ensure that zk client is connected: + // if not, try to reconnect, return false if reconnect failed + // DON'T use zk client if this function return false + bool EnsureConnected(); + private: void Connected();