diff --git a/src/yb/master/xcluster/xcluster_safe_time_service-test.cc b/src/yb/master/xcluster/xcluster_safe_time_service-test.cc index bfec9a449a50..954d6953e649 100644 --- a/src/yb/master/xcluster/xcluster_safe_time_service-test.cc +++ b/src/yb/master/xcluster/xcluster_safe_time_service-test.cc @@ -295,11 +295,6 @@ TEST_F(XClusterSafeTimeServiceTest, ComputeSafeTimeWithFilters) { ASSERT_EQ(safe_time_service.safe_time_map_[db2], ht_invalid); ASSERT_EQ(safe_time_service.entries_to_delete_.size(), 0); - ASSERT_NOK(safe_time_service.GetXClusterSafeTimeForNamespace( - dummy_leader_term + 1, db1, XClusterSafeTimeFilter::NONE)); - ASSERT_NOK(safe_time_service.GetXClusterSafeTimeForNamespace( - dummy_leader_term - 1, db1, XClusterSafeTimeFilter::NONE)); - auto db1_none = ASSERT_RESULT(GetXClusterSafeTimeWithNoFilter(safe_time_service, db1)); auto db1_ddlqueue = ASSERT_RESULT(GetXClusterSafeTimeFilterOutDdlQueue(safe_time_service, db1)); auto db2_none = ASSERT_RESULT(GetXClusterSafeTimeWithNoFilter(safe_time_service, db2)); diff --git a/src/yb/master/xcluster/xcluster_safe_time_service.cc b/src/yb/master/xcluster/xcluster_safe_time_service.cc index 090e5d4d8dd1..fcb8b2c6376c 100644 --- a/src/yb/master/xcluster/xcluster_safe_time_service.cc +++ b/src/yb/master/xcluster/xcluster_safe_time_service.cc @@ -138,26 +138,35 @@ void XClusterSafeTimeService::ProcessTaskPeriodically() { } auto leader_term_result = GetLeaderTermFromCatalogManager(); - if (!leader_term_result.ok()) { - VLOG_WITH_FUNC(1) << "Going into idle mode due to master leader change"; - EnterIdleMode("master leader change"); - return; - } - int64_t leader_term = leader_term_result.get(); + if (leader_term_result.ok()) { + // Compute safe time now and also update the metrics. + bool further_computation_needed = true; + auto result = ComputeSafeTime(*leader_term_result, /* update_metrics */ true); + if (result.ok()) { + further_computation_needed = result.get(); + } else { + LOG(WARNING) << "Failure in XClusterSafeTime task: " << result; + } - // Compute safe time now and also update the metrics. - bool further_computation_needed = true; - auto result = ComputeSafeTime(leader_term, /* update_metrics */ true); - if (result.ok()) { - further_computation_needed = result.get(); + if (!further_computation_needed) { + VLOG_WITH_FUNC(1) << "Going into idle mode due to lack of work"; + EnterIdleMode("no more work left"); + return; + } } else { - LOG(WARNING) << "Failure in XClusterSafeTime task: " << result; - } + // We can fail to get the term due to transient errors like a stale lease. In these cases we can + // recover without losing the leadership or changing the term. So check again to see if we are + // the leader. + auto leader_status = catalog_manager_->CheckIsLeaderAndReady(); + if (!leader_status.ok()) { + LOG_WITH_FUNC(INFO) << "Going into idle mode due to master leader change: " << leader_status; + EnterIdleMode("master leader change"); + return; + } - if (!further_computation_needed) { - VLOG_WITH_FUNC(1) << "Going into idle mode due to lack of work"; - EnterIdleMode("no more work left"); - return; + YB_LOG_EVERY_N_SECS_OR_VLOG(INFO, 60, 1) + << "Skip xCluster safe time computation since this is not a healthy master leader: " + << leader_term_result.status(); } // Delay before before running the task again. @@ -229,7 +238,6 @@ Result XClusterSafeTimeService::GetXClusterSafeTimeForNamespace( const XClusterSafeTimeFilter& filter) { SharedLock lock(mutex_); SCHECK(safe_time_table_ready_, IllegalState, "Safe time table is not ready yet."); - SCHECK_EQ(leader_term_, leader_term, IllegalState, "Received unexpected leader term"); const XClusterNamespaceToSafeTimeMap& safe_time_map = VERIFY_RESULT(GetFilteredXClusterSafeTimeMap(filter)); @@ -477,7 +485,6 @@ Result XClusterSafeTimeService::ComputeSafeTime( // and setting the new config. Its important to make sure that the config we persist is accurate // as only that protects the safe time from going backwards. RETURN_NOT_OK(SetXClusterSafeTime(leader_term, namespace_safe_time_map)); - leader_term_ = leader_term; if (update_metrics) { // Update the metrics using the newly computed maps. diff --git a/src/yb/master/xcluster/xcluster_safe_time_service.h b/src/yb/master/xcluster/xcluster_safe_time_service.h index 068f5eb47918..d6ab0ed2b13d 100644 --- a/src/yb/master/xcluster/xcluster_safe_time_service.h +++ b/src/yb/master/xcluster/xcluster_safe_time_service.h @@ -154,7 +154,6 @@ class XClusterSafeTimeService { std::unique_ptr safe_time_table_; - int64_t leader_term_ GUARDED_BY(mutex_); int32_t cluster_config_version_ GUARDED_BY(mutex_); std::map producer_tablet_namespace_map_ GUARDED_BY(mutex_);